Elasticsearch transport client java 操作
Elasticsearch 5.6.16
参考资料:https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.6/transport-client.html
一、Elasticsearch transport client java 操作
1、创建 ES 连接(长连接,加锁):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| private static TransportClient client;
public static TransportClient initConn() { log.info("Initialize Elasticsearch transportClient connection."); if (client == null) { synchronized (ElasticsearchUtils.class) { if (client == null) { Settings settings = Settings.builder() .put("cluster.name", CLUSTER_NAME) .put("client.transport.sniff", false) .put("client.transport.ignore_cluster_name", true) .build(); client = new PreBuiltTransportClient(settings);
Object clusterNodesObj = PropertiesUtils.getCommonYml("cluster.address.event"); if (clusterNodesObj == null) { throw new CustomException(ResultEnum.YML_READ_FAILED); } String clusterNodes = clusterNodesObj.toString();
log.info("ElasticSearch transport address: {}", clusterNodes); for (String node : StrUtil.split(clusterNodes, COMMA)) { String host = StrUtil.subBefore(node, COLON, false); String port = StrUtil.subAfter(node, COLON, false); Assert.hasText(host, "[Assertion failed] missing host name in'cluster.address.event'."); Assert.hasText(port, "[Assertion failed] missing port in'cluster.address.event'."); try { client.addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName(host), Integer.parseInt(port))); } catch (UnknownHostException e) { log.error("Failed to initialize the client.", e); throw new CustomException(ResultEnum.CLIENT_INITIALIZE_FAILED); } } } } } return client; }
|
通过 PropertiesUtils 工具类来获取 yml 文件的属性值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Slf4j public class PropertiesUtils {
private static String PROPERTY_NAME = "application-dev.yml";
public static Object getCommonYml(Object key) { Resource resource = new ClassPathResource(PROPERTY_NAME); Properties properties = null; try { YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean(); yamlFactory.setResources(resource); properties = yamlFactory.getObject(); } catch (Exception e) { log.error("Failed to read yml configuration.", e); return null; } assert properties != null; return properties.get(key); } }
|
2、创建索引模板:
1 2 3 4 5
| TransportClient transportClient = initConn();
PutIndexTemplateResponse res = transportClient.admin().indices().preparePutTemplate(dbcode).setSource(jsonObject).get();
|
3、判断索引模板是否存在:
1 2 3 4 5 6 7 8 9
|
public boolean existsIndexTemplate(String indexTemplateName){ GetIndexTemplatesResponse getIndexTemplatesResponse = client.admin().indices().prepareGetTemplates(indexTemplateName).get(); return getIndexTemplatesResponse.getIndexTemplates().isEmpty(); }
|
4、删除索引模板:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
public static void deleteTemplate(String dbcode) { TransportClient transportClient = initConn(); try { transportClient.admin().indices().prepareDeleteTemplate(dbcode).execute(); } finally { close(transportClient); } }
|
5、创建以日期命名的索引:
1 2 3 4
| String currentYm = DateUtil.format(DateUtil.date(), "yyyyMM");
CreateIndexResponse res = transportClient.admin().indices().prepareCreate(dbcode + "-" + currentYm).setSource(jsonObject).execute().actionGet();
|
6、插入索引:
1 2 3 4 5 6 7 8 9 10 11
|
public static IndexResponse insertToIndex(String index, JSONObject jsonObject) { TransportClient client = initConn(); return client.prepareIndex(index, "access_log").setSource(jsonObject).get(); }
|
7、判断索引是否存在,如果存在就删除索引:
1 2 3 4 5 6 7
| IndicesExistsRequest request = new IndicesExistsRequest(dbcode); IndicesExistsResponse response = transportClient.admin().indices().exists(request).actionGet(); if (response.isExists()) { transportClient.admin().indices() .prepareDelete(dbcode).execute().actionGet(); }
|
8、前缀通配符获取索引列表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
public static Set<String> getPrefixIndex(String indexPrefix) { TransportClient transportClient = initConn(); Set<String> set = null; try { set = transportClient.admin().indices().prepareStats(indexPrefix).get().getIndices().keySet(); } catch (Exception e) { log.error("The query index name is abnormal: ", e); } finally { close(transportClient); } return set; }
|
Elasticsearch常用API 整理:
9、根据docId更新某文档
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index(indexName); updateRequest.type(EventAttrConst.EVENTS_TYPE); updateRequest.id(docId); updateRequest.doc(jsonBuilder() .startObject() .field(EventAttrConst.STATE, "male") .field(EventAttrConst.HANDLE_DATE, "male") .field(EventAttrConst.HANDLE_REMARK, "male") .endObject()); UpdateResponse updateResponse = client.update(updateRequest).get(); if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) { logger.info("更新成功"); }
|
再见
好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是人才。
白嫖不好,创作不易。各位的支持和认可,就是我创作的最大动力,我们下篇文章见!
如果本篇博客有任何错误,请批评指教,不胜感激 !