Elasticsearch 5.6.16

参考资料:https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.6/transport-client.html

一、Elasticsearch transport client java 操作

1、创建 ES 连接(长连接,加锁):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private static TransportClient client;

/**
* 初始化 Elaticsearch transportClient 连接
*/
public static TransportClient initConn() {
log.info("Initialize Elasticsearch transportClient connection.");
if (client == null) {
synchronized (ElasticsearchUtils.class) {
if (client == null) {
Settings settings = Settings.builder()
.put("cluster.name", CLUSTER_NAME)
.put("client.transport.sniff", false)
.put("client.transport.ignore_cluster_name", true)
.build();
client = new PreBuiltTransportClient(settings);

Object clusterNodesObj = PropertiesUtils.getCommonYml("cluster.address.event");
if (clusterNodesObj == null) {
throw new CustomException(ResultEnum.YML_READ_FAILED);
}
String clusterNodes = clusterNodesObj.toString();

log.info("ElasticSearch transport address: {}", clusterNodes);
for (String node : StrUtil.split(clusterNodes, COMMA)) {
String host = StrUtil.subBefore(node, COLON, false);
String port = StrUtil.subAfter(node, COLON, false);
Assert.hasText(host, "[Assertion failed] missing host name in'cluster.address.event'.");
Assert.hasText(port, "[Assertion failed] missing port in'cluster.address.event'.");
try {
client.addTransportAddress(
new InetSocketTransportAddress(InetAddress.getByName(host), Integer.parseInt(port)));
} catch (UnknownHostException e) {
log.error("Failed to initialize the client.", e);
throw new CustomException(ResultEnum.CLIENT_INITIALIZE_FAILED);
}
}
}
}
}
return client;
}

通过 PropertiesUtils 工具类来获取 yml 文件的属性值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class PropertiesUtils {

private static String PROPERTY_NAME = "application-dev.yml";

public static Object getCommonYml(Object key) {
Resource resource = new ClassPathResource(PROPERTY_NAME);
Properties properties = null;
try {
YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean();
yamlFactory.setResources(resource);
properties = yamlFactory.getObject();
} catch (Exception e) {
log.error("Failed to read yml configuration.", e);
return null;
}
assert properties != null;
return properties.get(key);
}
}

2、创建索引模板:

1
2
3
4
5
TransportClient transportClient = initConn();
// 创建模板
// dbcode: 模板名称
// jsonObject: 模板内容,JSONObject 类型。还支持 XContentBuilder、String 类型
PutIndexTemplateResponse res = transportClient.admin().indices().preparePutTemplate(dbcode).setSource(jsonObject).get();

3、判断索引模板是否存在:

1
2
3
4
5
6
7
8
9
/**
* 判断索引模板是否存在
*
* @param indexTemplateName
*/
public boolean existsIndexTemplate(String indexTemplateName){
GetIndexTemplatesResponse getIndexTemplatesResponse = client.admin().indices().prepareGetTemplates(indexTemplateName).get();
return getIndexTemplatesResponse.getIndexTemplates().isEmpty();
}

4、删除索引模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 删除ES索引模板
*
* @param dbcode
*/
public static void deleteTemplate(String dbcode) {
TransportClient transportClient = initConn();
try {
transportClient.admin().indices().prepareDeleteTemplate(dbcode).execute();
} finally {
// 关闭连接
close(transportClient);
}
}

5、创建以日期命名的索引:

1
2
3
4
// 获取日期:年月
String currentYm = DateUtil.format(DateUtil.date(), "yyyyMM");
// 创建索引
CreateIndexResponse res = transportClient.admin().indices().prepareCreate(dbcode + "-" + currentYm).setSource(jsonObject).execute().actionGet();

6、插入索引:

1
2
3
4
5
6
7
8
9
10
11
/**
* 向索引中插入数据
*
* @param index:索引名称
* @param jsonObject:插入的索引内容
* @param access_log:索引mapping type
*/
public static IndexResponse insertToIndex(String index, JSONObject jsonObject) {
TransportClient client = initConn();
return client.prepareIndex(index, "access_log").setSource(jsonObject).get();
}

7、判断索引是否存在,如果存在就删除索引:

1
2
3
4
5
6
7
// dbcode支持通配符:aa-*
IndicesExistsRequest request = new IndicesExistsRequest(dbcode);
IndicesExistsResponse response = transportClient.admin().indices().exists(request).actionGet();
if (response.isExists()) {
transportClient.admin().indices()
.prepareDelete(dbcode).execute().actionGet();
}

8、前缀通配符获取索引列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 根据dbcode查询符合匹配的索引名称
*
* @param indexPrefix: test-index-*, *必不可少。
*/
public static Set<String> getPrefixIndex(String indexPrefix) {
TransportClient transportClient = initConn();
Set<String> set = null;
try {
set = transportClient.admin().indices().prepareStats(indexPrefix).get().getIndices().keySet();
} catch (Exception e) {
log.error("The query index name is abnormal: ", e);
} finally {
// 关闭连接
close(transportClient);
}
return set;
}

Elasticsearch常用API 整理:

9、根据docId更新某文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(indexName);
updateRequest.type(EventAttrConst.EVENTS_TYPE);
updateRequest.id(docId);
updateRequest.doc(jsonBuilder()
.startObject()
.field(EventAttrConst.STATE, "male")
.field(EventAttrConst.HANDLE_DATE, "male")
.field(EventAttrConst.HANDLE_REMARK, "male")
.endObject());
UpdateResponse updateResponse = client.update(updateRequest).get();
if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
logger.info("更新成功");
}