Elasticsearch 使用 BulkProcessor 将 Bulk API 进一步封装,大大简化了对文档的 增加/更新/删除 操作。接下来,我们一起来学习一下 BulkProcessor 的具体实现。

版本:6.5.0

一、添加 pom 依赖

本文示例使用的是 Spring Boot 框架,由于该框架有默认的 Elasticsearch 版本,为了避免版本混乱或冲突,我在 pom.xml 文件内添加了如下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!-- elasticsearch 6.5.0 -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.5.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.5.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
<version>6.5.0</version>
</dependency>

二、创建 BulkProcessor 实例

1、BulkProcessor 类提供了简单接口去自动刷新 bulk 操作,可设置条件来自动触发 bulk 操作。比如:

  • 设置 request 的数量:setBulkActions()
  • 设置 request 的大小:setBulkSize()
  • 设置 bulk 执行的周期:setFlushInterval()

还可指定一些优化的参数,比如:

  • 设置并发请求数:setConcurrentRequests()
  • 设置最大重试次数和重试周期:setBackoffPolicy()

2、如果创建 BulkProcessor 实例,需要指定 Elasticsearch 初始化的 client ,这里是用 TransportAddress 来初始化的 client 。client 用于执行 BulkRequest 和 BulkResponse 。

3、BulkProcessor 有一个 Listener ,在每次 BulkRequest 执行之前或之后或 BulkRequest 失败时调用该 Listener 。

具体的 BulkProcessor 的代码实现如下所示(附带详细注释):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private final static String HOST = "192.168.162.72";
private final static int PORT = 9300;
private final static String CLUSTERNAME = "elasticsearch";
private TransportClient client;

private BulkProcessor bulkProcessor() {

// 设置集群名称
Settings settings = Settings.builder().put("cluster.name", CLUSTERNAME).build();

// 创建客户端
try {
client = new PreBuiltTransportClient(settings)
.addTransportAddresses(new TransportAddress(InetAddress.getByName(HOST), PORT));
} catch (UnknownHostException e) {
logger.error(e.getMessage());
}

return BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) {
logger.info("序号:{} ,开始执行 {} 条数据批量操作。", executionId, request.numberOfActions());
}

@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
// 在每次执行BulkRequest后调用,通过此方法可以获取BulkResponse是否包含错误
if (response.hasFailures()) {
logger.error("Bulk {} executed with failures", executionId);
} else {
logger.info("序号:{} ,执行 {} 条数据批量操作成功,共耗费{}毫秒。", executionId, request.numberOfActions(), response.getTook().getMillis());
}
}

@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
logger.error("序号:{} 批量操作失败,总记录数:{} ,报错信息为:{}", executionId, request.numberOfActions(), failure.getMessage());
}
})
// 每添加1000个request,执行一次bulk操作
.setBulkActions(1000)
// 每达到5M的请求size时,执行一次bulk操作
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
// 每5s执行一次bulk操作
.setFlushInterval(TimeValue.timeValueSeconds(5))
// 设置并发请求数。默认是1,表示允许执行1个并发请求,积累bulk requests和发送bulk是异步的,其数值表示发送bulk的并发线程数(可以为2、3、...);若设置为0表示二者同步。
.setConcurrentRequests(1)
// 最大重试次数为3次,启动延迟为100ms。
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
}

三、文档批量增加/更新

批量增加/更新文档是将多个 IndexRequest 请求添加到 BulkProcessor 中,其中 IndexRequest 中的文档格式本文提供了两种,分别为 Map 和 Json 。

1、map 格式的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void mapData() {
ESBulkProcessor esBulkProcessor = new ESBulkProcessor();
BulkProcessor esBulk = esBulkProcessor.bulkProcessor();
Map<String, Object> m = new HashMap<>();
for (int i = 0; i < 3000; i++) {
m.put("name", "name" + i);
m.put("age", new Random().nextInt(50));
m.put("sex", Math.round(Math.random()) == 1 ? "男" : "女");
esBulk.add(new IndexRequest("es_map_test", "_doc", String.valueOf(i)).source(m));
}
// 最后执行一次刷新操作
esBulk.flush();
// 30秒后关闭BulkProcessor
try {
esBulk.awaitClose(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

2、json 格式的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void jsonData() {
ESBulkProcessor esBulkProcessor = new ESBulkProcessor();
BulkProcessor esBulk = esBulkProcessor.bulkProcessor();
for (int i = 0; i < 3000; i++) {
try {
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("name", "name" + i)
.field("age", new Random().nextInt(50))
.field("sex", Math.round(Math.random()) == 1 ? "男" : "女")
.endObject();
// 如果json是String类型的话,xx.source(jsonString, XContentType.JSON)
esBulk.add(new IndexRequest("es_json_test", "_doc", String.valueOf(i)).source(builder));
} catch (IOException e) {
e.printStackTrace();
}
}
// 最后执行一次刷新操作
esBulk.flush();
// 30秒后关闭BulkProcessor
try {
esBulk.awaitClose(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

上述实例,是采用 for 循环的方式,将 3000 条数据批量插入 Elasticsearch 的索引中,然后再执行一次 BulkProcessor 的 flush() 操作,确保缓存数据也被提交,最后关闭 BulkProcessor 的连接。关闭连接的方式有两种,上述实例中使用 awaitClose 来约定时间后关闭,还可以使用 close() 来立即关闭。

四、文档批量删除

使用 DeleteRequest 方法指定文档 id 来删除索引内文档,将多个 DeleteRequest 添加到 BulkProcessor 来实现文档的批量删除。具体代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void bulkDelete() {
ESBulkProcessor esBulkProcessor = new ESBulkProcessor();
BulkProcessor esBulk = esBulkProcessor.bulkProcessor();
for (int i = 0; i < 3000; i++) {
esBulk.add(new DeleteRequest("es_map_test", "_doc", String.valueOf(i)));
}
// 最后执行一次刷新操作
esBulk.flush();
// 30秒后关闭BulkProcessor
try {
esBulk.awaitClose(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

五、总结

执行文档批量请求时,首先需要初始化 Elasticsearch Client,其次创建 BulkProcessor ,还可设置条件来自定义 Bulk 操作,最后就是将多条 Requests 添加到创建的 BulkProcessor 里。

一开始我在学习 BulkProcessor 的时候,犯了一个错误,就是将 esBulkProcessor.bulkProcessor().add 放在了 for 循环中,这样就导致了创建了很多 BulkProcessor 实例,也就导致批量插入数据老是有丢失,这样的写法是不对的。

正确的做法应该将 esBulkProcessor.bulkProcessor() 放到 for 循环外面,这样就只创建了一个 BulkProcessor ,然后将多个 Requests 添加到 BulkProcessor 中去执行。

3000条数据批量插入的执行结果如下图所示:

参考资料:

本文全部代码已上传至 github :

https://github.com/841809077/hdp3project/blob/master/src/main/java/com/hdp3/project/elasticsearch/ESBulkProcessor.java