Elasticsearch 使用 BulkProcessor 将 Bulk API 进一步封装,大大简化了对文档的 增加/更新/删除 操作。接下来,我们一起来学习一下 BulkProcessor 的具体实现。
版本:6.5.0
一、添加 pom 依赖
本文示例使用的是 Spring Boot 框架,由于该框架有默认的 Elasticsearch 版本,为了避免版本混乱或冲突,我在 pom.xml 文件内添加了如下依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.5.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.5.0</version> </dependency> <dependency> <groupId>org.elasticsearch.plugin</groupId> <artifactId>transport-netty4-client</artifactId> <version>6.5.0</version> </dependency>
|
二、创建 BulkProcessor 实例
1、BulkProcessor 类提供了简单接口去自动刷新 bulk 操作,可设置条件来自动触发 bulk 操作。比如:
- 设置 request 的数量:setBulkActions()
- 设置 request 的大小:setBulkSize()
- 设置 bulk 执行的周期:setFlushInterval()
还可指定一些优化的参数,比如:
- 设置并发请求数:setConcurrentRequests()
- 设置最大重试次数和重试周期:setBackoffPolicy()
2、如果创建 BulkProcessor 实例,需要指定 Elasticsearch 初始化的 client ,这里是用 TransportAddress 来初始化的 client 。client 用于执行 BulkRequest 和 BulkResponse 。
3、BulkProcessor 有一个 Listener ,在每次 BulkRequest 执行之前或之后或 BulkRequest 失败时调用该 Listener 。
具体的 BulkProcessor 的代码实现如下所示(附带详细注释):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| private final static String HOST = "192.168.162.72"; private final static int PORT = 9300; private final static String CLUSTERNAME = "elasticsearch"; private TransportClient client;
private BulkProcessor bulkProcessor() {
Settings settings = Settings.builder().put("cluster.name", CLUSTERNAME).build();
try { client = new PreBuiltTransportClient(settings) .addTransportAddresses(new TransportAddress(InetAddress.getByName(HOST), PORT)); } catch (UnknownHostException e) { logger.error(e.getMessage()); }
return BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { logger.info("序号:{} ,开始执行 {} 条数据批量操作。", executionId, request.numberOfActions()); }
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { logger.error("Bulk {} executed with failures", executionId); } else { logger.info("序号:{} ,执行 {} 条数据批量操作成功,共耗费{}毫秒。", executionId, request.numberOfActions(), response.getTook().getMillis()); } }
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("序号:{} 批量操作失败,总记录数:{} ,报错信息为:{}", executionId, request.numberOfActions(), failure.getMessage()); } }) .setBulkActions(1000) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); }
|
三、文档批量增加/更新
批量增加/更新文档是将多个 IndexRequest 请求添加到 BulkProcessor 中,其中 IndexRequest 中的文档格式本文提供了两种,分别为 Map 和 Json 。
1、map 格式的写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| private void mapData() { ESBulkProcessor esBulkProcessor = new ESBulkProcessor(); BulkProcessor esBulk = esBulkProcessor.bulkProcessor(); Map<String, Object> m = new HashMap<>(); for (int i = 0; i < 3000; i++) { m.put("name", "name" + i); m.put("age", new Random().nextInt(50)); m.put("sex", Math.round(Math.random()) == 1 ? "男" : "女"); esBulk.add(new IndexRequest("es_map_test", "_doc", String.valueOf(i)).source(m)); } esBulk.flush(); try { esBulk.awaitClose(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } }
|
2、json 格式的写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| private void jsonData() { ESBulkProcessor esBulkProcessor = new ESBulkProcessor(); BulkProcessor esBulk = esBulkProcessor.bulkProcessor(); for (int i = 0; i < 3000; i++) { try { XContentBuilder builder = XContentFactory.jsonBuilder() .startObject() .field("name", "name" + i) .field("age", new Random().nextInt(50)) .field("sex", Math.round(Math.random()) == 1 ? "男" : "女") .endObject(); esBulk.add(new IndexRequest("es_json_test", "_doc", String.valueOf(i)).source(builder)); } catch (IOException e) { e.printStackTrace(); } } esBulk.flush(); try { esBulk.awaitClose(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } }
|
上述实例,是采用 for 循环的方式,将 3000 条数据批量插入 Elasticsearch 的索引中,然后再执行一次 BulkProcessor 的 flush() 操作,确保缓存数据也被提交,最后关闭 BulkProcessor 的连接。关闭连接的方式有两种,上述实例中使用 awaitClose 来约定时间后关闭,还可以使用 close() 来立即关闭。
四、文档批量删除
使用 DeleteRequest 方法指定文档 id 来删除索引内文档,将多个 DeleteRequest 添加到 BulkProcessor 来实现文档的批量删除。具体代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private void bulkDelete() { ESBulkProcessor esBulkProcessor = new ESBulkProcessor(); BulkProcessor esBulk = esBulkProcessor.bulkProcessor(); for (int i = 0; i < 3000; i++) { esBulk.add(new DeleteRequest("es_map_test", "_doc", String.valueOf(i))); } esBulk.flush(); try { esBulk.awaitClose(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } }
|
五、总结
执行文档批量请求时,首先需要初始化 Elasticsearch Client,其次创建 BulkProcessor ,还可设置条件来自定义 Bulk 操作,最后就是将多条 Requests 添加到创建的 BulkProcessor 里。
一开始我在学习 BulkProcessor 的时候,犯了一个错误,就是将 esBulkProcessor.bulkProcessor().add 放在了 for 循环中,这样就导致了创建了很多 BulkProcessor 实例,也就导致批量插入数据老是有丢失,这样的写法是不对的。
正确的做法应该将 esBulkProcessor.bulkProcessor() 放到 for 循环外面,这样就只创建了一个 BulkProcessor ,然后将多个 Requests 添加到 BulkProcessor 中去执行。
3000条数据批量插入的执行结果如下图所示:
参考资料:
本文全部代码已上传至 github :
https://github.com/841809077/hdp3project/blob/master/src/main/java/com/hdp3/project/elasticsearch/ESBulkProcessor.java