Elasticsearch文档操作

新建文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
PUT my_demo/_doc/1
{
"id": 1,
"user" : "hu",
"post_date" : "2021-12-15T15:22:32",
"message" : "dsgasdgsdg"
}

#自动生成索引
PUT my_demo/_doc/
{
"id": 1,
"user" : "hu",
"post_date" : "2021-12-15T15:22:32",
"message" : "dsgasdgsdg"
}

测试:

创建一个索引:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
PUT my_demo
{
"settings" : {
"index" : {
"number_of_shards" : 3,
"number_of_replicas" : 2
}
},
"mappings" : {
"_doc" : {
"properties" : {
"user" : { "type" : "text" },
"post_date" : { "type" : "text" },
"message" : { "type" : "text" }
}
}
}
}

建立文档:

1
2
3
4
5
6
7
PUT  /my_demo/_doc/1
{
"id": 1,
"user" : "hu",
"post_date" : "2021-12-15T15:22:32",
"message" : "dsgasdgsdg"
}

返回值说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"_index" : "my_demo",
"_type" : "_doc", #所属mapping type
"_id" : "1", #文档id
"_version" : 1,
"result" : "created",
"_shards" : { #分片写入情况
"total" : 3,#所在分片有三个副本
"successful" : 1, #一个副本上成功写入
"failed" : 0 #失败副本数
},
"_seq_no" : 0, #第几次操作该文档
"_primary_term" : 1 #词项数
}

获取单个文档

HEAD my_demo/_doc/1

GET my_demo/_doc/1

不获取文档的source: GET my_demo/_doc/1?_source=false

获取文档的source:GET twitter/_doc/1/_source

获取多个文档 _mget

多种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
GET /_mget
{
"docs" : [
{
"_index" : "my_demo",
"_type" : "_doc",
"_id" : "1"
},
{
"_index" : "my_demo1",
"_type" : "_doc",
"_id" : "2"
"stored_fields" : ["field3", "field4"]
}
]
}

GET /my_demo/_mget
{
"docs" : [
{
"_type" : "_doc",
"_id" : "1"
},
{
"_type" : "_doc",
"_id" : "2"
}
]
}


GET /my_demo/_doc/_mget
{
"docs" : [
{
"_id" : "1"
},
{
"_id" : "2"
}
]
}

GET /my_demo/_doc/_mget
{
"ids" : ["1", "2"]
}

删除文档

指定文档id进行删除 DELETE my_demo/_doc/1

用版本来控制删除 DELETE my_demo/_doc/1?version=1

查询删除

1
2
3
4
5
6
7
8
POST my_demo/_delete_by_query
{
"query": {
"match": {
"message": "4654515"
}
}
}

当有文档有版本冲突时,不放弃删除操作(记录冲突的文档,继续删除其他复合查询的文档)

1
2
3
4
5
6
7
8
9
10
11
POST my_demo/_doc/_delete_by_query?conflicts=proceed
{
"query": {
"match_all": {}
}
}

通过task api 来查看 查询删除任务
GET _tasks?detailed=true&actions=*/delete/byquery
查询具体任务的状态GET /_tasks/taskId:1
取消任务 POST _tasks/task_id:1/_cancel

更新文档

指定文档id进行修改

1
2
3
4
5
6
7
PUT my_demo/_doc/1
{
"id": 1,
"user" : "hu",
"post_date" : "fsadfs",
"message" : "sadgsd"
}

乐观锁并发更新控制

1
2
3
4
5
6
7
PUT my_demo/_doc/1?version=1
{
"id": 1,
"user" : "yun",
"post_date" : "adfasdfa",
"message" : "ewetgege"
}

通过脚本来更新文档

对文档的counter + 1

1
2
3
4
5
6
7
8
9
10
POST my_demo/_doc/1/_update
{
"script" : {
"source": "ctx._source.counter += params.count",
"lang": "painless",
"params" : {
"count" : 1
}
}
}

往数组中加入元素

1
2
3
4
5
6
7
8
9
10
POST my_demo/_doc/1/_update
{
"script" : {
"source": "ctx._source.tags.add(params.tag)",
"lang": "painless",
"params" : {
"tag" : "blue"
}
}
}

painless是es内置的一种脚本语言,ctx执行上下文对象(通过它还可访问_index, _type, _id, _version, _routing and _now (the current timestamp) ),params是参数集合

脚本更新要求索引的_source 字段是启用的。更新执行流程:

1
2
3
4
5
a、获取到原文档
b、通过_source字段的原始数据,执行脚本修改。
c、删除原索引文档
d、索引修改后的文档
它只是降低了一些网络往返,并减少了get和索引之间版本冲突的可能性。

其他操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
添加一个字段
POST my_demo/_doc/1/_update
{
"script" : "ctx._source.new_field = 'value_of_new_field'"
}
移除一个字段
POST my_demo/_doc/1/_update
{
"script" : "ctx._source.remove('new_field')"
}
判断删除或不做什么
POST my_demo/_doc/1/_update
{
"script" : {
"source": "if (ctx._source.tags.contains(params.tag)) { ctx.op = 'delete' } else { ctx.op = 'none' }",
"lang": "painless",
"params" : {
"tag" : "green"
}
}
}
合并传人的文档字段进行更新
POST my_demo/_doc/1/_update
{
"doc" : {
"name" : "new_name"
}
}
设置不做noop检测

复制代码
POST my_demo/_doc/1/_update
{
"doc" : {
"name" : "new_name"
},
"detect_noop": false
}

noop检测:已经执行过的脚本不再执行

upsert 操作:如果要更新的文档存在,则执行脚本进行更新,如不存在,则把 upsert中的内容作为一个新文档写入。

POST my_demo/_doc/1/_update
{
"script" : {
"source": "ctx._source.counter += params.count",
"lang": "painless",
"params" : {
"count" : 4
}
},
"upsert" : {
"counter" : 1
}
}

通过条件查询来更新文档

1
2
3
4
5
6
7
8
9
10
11
12
POST my_demo/_update_by_query
{
"script": {
"source": "ctx._source.likes++",
"lang": "painless"
},
"query": {
"term": {
"user": "hu"
}
}
}

批量操作

批量操作API /_bulk 可以在一次调用中执行多个索引、删除操作。这可以大大提高索引数据的速度。批量操作内容体需按如下以新行分割的json结构格式给出:

语法:

1
2
3
4
5
6
7
action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n

说明:

action_and_meta_data: action可以是 index, create, delete and update ,meta_data 指: _index ,_type,_id 请求端点可以是: /_bulk, /{index}/_bulk, {index}/{type}/_bulk

示例:

1
2
3
4
5
6
7
8
POST _bulk
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

curl + json 文件 批量索引多个文档

类似于导入文件数据

1
curl -H "Content-Type: application/json" -XPOST "localhost:9200/my_demo/_doc/_bulk?pretty&refresh" --data-binary "@my_demo.json"

数据格式:

1
2
3
4
5
{"index":{"_id":"1"}}
{"字段1":值1,"字段2":值2...}
{"index":{"_id":"6"}}
{"字段1":值1,"字段2":值2...}
···

reindex 重索引

Reindex API /_reindex 让我们可以将一个索引中的数据重索引到另一个索引中(拷贝),要求源索引的_source 是开启的。目标索引的setting 、mapping 信息与源索引无关。

1
2
3
4
5
6
7
8
9
POST _reindex
{
"source": {
"index": "my_demo"
},
"dest": {
"index": "my_demo_1"
}
}

数据有重复的处理:

1、如果没有指定version_type 或指定为 internal,则会是采用目标索引中的版本,重索引过程中,执行的就是新增、更新操作。

1
2
3
4
5
6
7
8
9
10
POST _reindex
{
"source": {
"index": "my_demo"
},
"dest": {
"index": "my_demo_1",
"version_type": "internal"
}
}

2、如果想使用源索引中的版本来进行版本控制更新,则设置 version_type 为extenal。重索引操作将写入不存在的,更新旧版本的数据。

1
2
3
4
5
6
7
8
9
10
POST _reindex
{
"source": {
"index": "my_demo"
},
"dest": {
"index": "my_demo_1",
"version_type": "external"
}
}

3、如果只想从源索引中复制目标索引中不存在的文档数据,可以指定 op_type 为 create 。此时存在的文档将触发 版本冲突(会导致放弃操作),可设置“conflicts”: “proceed“,跳过继续。

1
2
3
4
5
6
7
8
9
10
11
POST _reindex
{
"conflicts": "proceed",
"source": {
"index": "my_demo"
},
"dest": {
"index": "my_demo_1",
"op_type": "create"
}
}

4、可以只索引源索引的一部分数据,通过 type 或 查询来指定你需要的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
POST _reindex
{
"source": {
"index": "my_demo",
"type": "_doc",
"query": {
"term": {
"user": "hu"
}
}
},
"dest": {
"index": "my_demo_1"
}
}

可以从多个源获取数据

1
2
3
4
5
6
7
8
9
10
POST _reindex
{
"source": {
"index": ["my_demo", "my_demo_1"],
"type": ["_doc", "post"]
},
"dest": {
"index": "my_demo_2"
}
}

可以限定文档数量

1
2
3
4
5
6
7
8
9
10
11
POST _reindex
{
"size": 10000,
"source": {
"index": "my_demo",
"sort": { "date": "desc" }
},
"dest": {
"index": "my_demo_1"
}
}

可以选择复制源文档的哪些字段

1
2
3
4
5
6
7
8
9
10
POST _reindex
{
"source": {
"index": "my_demo",
"_source": ["user", "_doc"]
},
"dest": {
"index": "my_demo_1"
}
}

可以用script来改变文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
POST _reindex
{
"source": {
"index": "my_demo"
},
"dest": {
"index": "my_demo_1",
"version_type": "external"
},
"script": {
"source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
"lang": "painless"
}
}

可以指定路由值把文档放到哪个分片上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
POST _reindex
{
"source": {
"index": "source",
"query": {
"match": {
"company": "cat"
}
}
},
"dest": {
"index": "my_demo_1",
"routing": "=cat"
}
}

从远程源复制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
POST _reindex
{
"source": {
"remote": {
"host": "http://15151515:9200",
"username": "user",
"password": "pass"
},
"index": "my_demo",
"query": {
"match": {
"test": "data"
}
}
},
"dest": {
"index": "my_demo_1"
}
}

通过_task 来查询执行状态

GET _tasks?detailed=true&actions=*reindex

refresh

对于索引、更新、删除操作如果想操作完后立马重刷新可见,可带上refresh参数

PUT /demo/_doc/1?refresh

refresh 可选值说明

1
2
3
未给值或=true,则立马会重刷新读索引。
=false ,相当于没带refresh 参数,遵循内部的定时刷新。
=wait_for ,登记等待刷新,当登记的请求数达到index.max_refresh_listeners 参数设定的值时(defaults to 1000),将触发重刷新。