|
@@ -0,0 +1,451 @@
|
|
|
+package com.elab.es.data;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.elab.core.utils.DateUtils;
|
|
|
+import org.apache.http.HttpHost;
|
|
|
+import org.apache.http.auth.AuthScope;
|
|
|
+import org.apache.http.auth.UsernamePasswordCredentials;
|
|
|
+import org.apache.http.client.CredentialsProvider;
|
|
|
+import org.apache.http.impl.client.BasicCredentialsProvider;
|
|
|
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
|
|
|
+import org.elasticsearch.action.DocWriteResponse;
|
|
|
+import org.elasticsearch.action.bulk.BulkItemResponse;
|
|
|
+import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
+import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
+import org.elasticsearch.action.delete.DeleteRequest;
|
|
|
+import org.elasticsearch.action.delete.DeleteResponse;
|
|
|
+import org.elasticsearch.action.get.GetRequest;
|
|
|
+import org.elasticsearch.action.get.GetResponse;
|
|
|
+import org.elasticsearch.action.get.MultiGetRequest;
|
|
|
+import org.elasticsearch.action.get.MultiGetResponse;
|
|
|
+import org.elasticsearch.action.index.IndexRequest;
|
|
|
+import org.elasticsearch.action.index.IndexResponse;
|
|
|
+import org.elasticsearch.action.search.SearchRequest;
|
|
|
+import org.elasticsearch.action.search.SearchResponse;
|
|
|
+import org.elasticsearch.action.update.UpdateRequest;
|
|
|
+import org.elasticsearch.action.update.UpdateResponse;
|
|
|
+import org.elasticsearch.client.RequestOptions;
|
|
|
+import org.elasticsearch.client.RestClient;
|
|
|
+import org.elasticsearch.client.RestClientBuilder;
|
|
|
+import org.elasticsearch.client.RestHighLevelClient;
|
|
|
+import org.elasticsearch.client.core.TermVectorsRequest;
|
|
|
+import org.elasticsearch.client.core.TermVectorsResponse;
|
|
|
+import org.elasticsearch.common.Strings;
|
|
|
+import org.elasticsearch.common.unit.TimeValue;
|
|
|
+import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
|
+import org.elasticsearch.common.xcontent.XContentFactory;
|
|
|
+import org.elasticsearch.common.xcontent.XContentType;
|
|
|
+import org.elasticsearch.index.query.MatchPhraseQueryBuilder;
|
|
|
+import org.elasticsearch.index.query.QueryBuilders;
|
|
|
+import org.elasticsearch.index.query.TermQueryBuilder;
|
|
|
+import org.elasticsearch.index.reindex.ReindexRequest;
|
|
|
+import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
|
|
+import org.elasticsearch.rest.RestStatus;
|
|
|
+import org.elasticsearch.search.SearchHit;
|
|
|
+import org.elasticsearch.search.SearchHits;
|
|
|
+import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
+/**
|
|
|
+ * es基本操作
|
|
|
+ *
|
|
|
+ * @author : liukx
|
|
|
+ * @time : 2019/7/31 - 17:01
|
|
|
+ */
|
|
|
+public class ALiYunRestHighLevelClientTest {
|
|
|
+
|
|
|
+ private Logger logger = LoggerFactory.getLogger(ALiYunRestHighLevelClientTest.class);
|
|
|
+ private RestHighLevelClient client;
|
|
|
+
|
|
|
+
|
|
|
+ private String host = "es-cn-0pp1jc07f001i1t59.public.elasticsearch.aliyuncs.com";
|
|
|
+ private String username = "elastic";
|
|
|
+ private String password = "elab@123";
|
|
|
+
|
|
|
+ private String indexName = "posts";
|
|
|
+
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void before() {
|
|
|
+// client = new RestHighLevelClient(
|
|
|
+// RestClient.builder(
|
|
|
+//// new HttpHost("localhost", 9200, "http"),
|
|
|
+// new HttpHost("192.168.0.24", 9200, "http")));
|
|
|
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
|
|
+ //访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
|
|
|
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
|
|
|
+
|
|
|
+ // 通过builder创建rest client,配置http client的HttpClientConfigCallback。
|
|
|
+ // 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址。
|
|
|
+ RestClientBuilder builder = RestClient.builder(new HttpHost(host, 9200))
|
|
|
+ .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
|
|
|
+ @Override
|
|
|
+ public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
|
|
|
+ return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // RestHighLevelClient实例通过REST low-level client builder进行构造。
|
|
|
+ client = new RestHighLevelClient(builder);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void generateData(String username, String message) throws IOException {
|
|
|
+ IndexRequest request = jsonData(username, message);
|
|
|
+ //设置超时时间
|
|
|
+ request.timeout(TimeValue.timeValueSeconds(1));
|
|
|
+ IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
|
|
|
+
|
|
|
+ logger.info(" result : " + indexResponse.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void createData() throws IOException {
|
|
|
+ generateData("周杰伦", "窗外的麻雀,在电线杆上多嘴");
|
|
|
+ generateData("林俊杰", "圈圈圆圆圈圈,天天年年天天的我,深深看你的脸");
|
|
|
+ generateData("潘玮柏", "天天都需要你爱,我的心思由你猜");
|
|
|
+ generateData("周笔畅", "我很想飞,多远都不会累.");
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void indexAPI() throws IOException {
|
|
|
+ IndexRequest request = null;
|
|
|
+ // 下面是简单介绍的三种格式类型
|
|
|
+ request = jsonData("周杰伦", "窗外的麻雀,在电线杆上多嘴。");
|
|
|
+// request = mapData("林俊杰", "确认过眼神,我遇上对的人");
|
|
|
+// request = docData("某某某", "哈哈,终于成功了");
|
|
|
+
|
|
|
+ //设置超时时间
|
|
|
+ request.timeout(TimeValue.timeValueSeconds(1));
|
|
|
+
|
|
|
+ //设置版本号
|
|
|
+// request.version(1);
|
|
|
+// request.versionType(VersionType.EXTERNAL);
|
|
|
+// request.opType(DocWriteRequest.OpType.CREATE);
|
|
|
+// request.setPipeline("pipeline");
|
|
|
+
|
|
|
+
|
|
|
+ // 同步执行方式posts
|
|
|
+ IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
|
|
|
+
|
|
|
+ logger.info(" result : " + indexResponse.toString());
|
|
|
+// String index = indexResponse.getIndex();
|
|
|
+// String type = indexResponse.getType();
|
|
|
+// String id = indexResponse.getId();
|
|
|
+// long version = indexResponse.getVersion();
|
|
|
+
|
|
|
+
|
|
|
+ // 异步方式
|
|
|
+// client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
|
|
|
+// @Override
|
|
|
+// public void onResponse(IndexResponse indexResponse) {
|
|
|
+// RestStatus status = indexResponse.status();
|
|
|
+// System.out.println(status.getStatus());
|
|
|
+// }
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public void onFailure(Exception e) {
|
|
|
+// System.out.println("执行失败.." + e.getMessage());
|
|
|
+// }
|
|
|
+// });
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据id获取数据
|
|
|
+ *
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void getApi() throws IOException {
|
|
|
+ GetRequest request = new GetRequest(
|
|
|
+ indexName,
|
|
|
+ "_doc",
|
|
|
+ "3533216d-2b23-4bd2-a129-7c0ede2ed5b3");
|
|
|
+// request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
|
|
|
+
|
|
|
+// String[] includes = new String[]{"message", "*Date"};
|
|
|
+// String[] excludes = Strings.EMPTY_ARRAY;
|
|
|
+// FetchSourceContext fetchSourceContext =
|
|
|
+// new FetchSourceContext(true, includes, excludes);
|
|
|
+// request.fetchSourceContext(fetchSourceContext);
|
|
|
+// request.storedFields("message");
|
|
|
+ GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
|
|
|
+ // 批量获取
|
|
|
+ // GetResponse getResponse = client.mget()
|
|
|
+
|
|
|
+// String message = getResponse.getField("message").getValue();
|
|
|
+ logger.info("--------" + getResponse.getSource().toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void exists() throws IOException {
|
|
|
+ GetRequest getRequest = new GetRequest(
|
|
|
+ indexName,
|
|
|
+ "_doc",
|
|
|
+ "3533216d-2b23-4bd2-a129-7c0ede2ed5b3");
|
|
|
+ getRequest.fetchSourceContext(new FetchSourceContext(false));
|
|
|
+ getRequest.storedFields("_none_");
|
|
|
+ boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
|
|
|
+ logger.info("result : " + exists);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void delete() throws IOException {
|
|
|
+ DeleteRequest request = new DeleteRequest(
|
|
|
+ indexName,
|
|
|
+ "_doc",
|
|
|
+ "settings");
|
|
|
+ DeleteResponse deleteResponse = client.delete(
|
|
|
+ request, RequestOptions.DEFAULT);
|
|
|
+ RestStatus status = deleteResponse.status();
|
|
|
+
|
|
|
+ // deleteResponse.getShardInfo().getTotal()
|
|
|
+ logger.info(status.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void updateAPI() throws IOException {
|
|
|
+ Map<String, Object> jsonMap = new HashMap<>();
|
|
|
+ jsonMap.put("postDate", new Date());
|
|
|
+ jsonMap.put("user", "daily update");
|
|
|
+ UpdateRequest request = new UpdateRequest(indexName, "_doc", "c9ef49c1-e04b-4d1b-84ef-33ca7004702b")
|
|
|
+ .doc(jsonMap);
|
|
|
+ // 如果尝试更新的时候,发现其他线程也在更改该数据,这里会指定重试次数
|
|
|
+// request.retryOnConflict(3);
|
|
|
+ // 当指定了版本号之后,会与ES中的文档数据进行比对,相同才会更改,不同则会异常。
|
|
|
+ request.version(4);
|
|
|
+ UpdateResponse updateResponse = client.update(
|
|
|
+ request, RequestOptions.DEFAULT);
|
|
|
+ updateResponse.getGetResult();
|
|
|
+ DocWriteResponse.Result result = updateResponse.getResult();
|
|
|
+ if (result == DocWriteResponse.Result.UPDATED) {
|
|
|
+ logger.info("ES 执行了 UPDATE 操作.");
|
|
|
+ }
|
|
|
+ logger.info(" =>> " + updateResponse.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void likeAPI() throws IOException {
|
|
|
+// GetRequest request = new GetRequest(indexName, "2da3f2f9-b21d-4bee-8064-64d2f8171db2");
|
|
|
+// GetResponse documentFields = client.get(request, RequestOptions.DEFAULT);
|
|
|
+// Map<String, Object> source = documentFields.getSource();
|
|
|
+// System.out.println(JSON.toJSONString(source));
|
|
|
+ SearchRequest searchRequest = new SearchRequest(indexName);
|
|
|
+ MatchPhraseQueryBuilder matchPhraseQueryBuilder = QueryBuilders.matchPhraseQuery("message", "我");
|
|
|
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
|
|
+ searchSourceBuilder.query(matchPhraseQueryBuilder);
|
|
|
+ searchRequest.source(searchSourceBuilder);
|
|
|
+ List<String> strings = listSearchResult(searchRequest);
|
|
|
+ System.out.println(JSON.toJSONString(strings));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void termVectorsApi() throws IOException {
|
|
|
+ // todo 待研究 1
|
|
|
+// TermVectorsRequest request = new TermVectorsRequest(indexName, "doc", "1");
|
|
|
+// request.setFields("user");
|
|
|
+ XContentBuilder docBuilder = XContentFactory.jsonBuilder();
|
|
|
+ docBuilder.startObject().field("user", "guest-user").endObject();
|
|
|
+ TermVectorsRequest request = new TermVectorsRequest(indexName,
|
|
|
+ "_doc",
|
|
|
+ docBuilder);
|
|
|
+
|
|
|
+
|
|
|
+ // 可选参数
|
|
|
+ request.setFieldStatistics(false);
|
|
|
+ request.setTermStatistics(true);
|
|
|
+ request.setPositions(false);
|
|
|
+ request.setOffsets(false);
|
|
|
+ request.setPayloads(false);
|
|
|
+
|
|
|
+ Map<String, Integer> filterSettings = new HashMap<>();
|
|
|
+ filterSettings.put("max_num_terms", 3);
|
|
|
+ filterSettings.put("min_term_freq", 1);
|
|
|
+ filterSettings.put("max_term_freq", 10);
|
|
|
+ filterSettings.put("min_doc_freq", 1);
|
|
|
+ filterSettings.put("max_doc_freq", 100);
|
|
|
+ filterSettings.put("min_word_length", 1);
|
|
|
+ filterSettings.put("max_word_length", 10);
|
|
|
+
|
|
|
+ request.setFilterSettings(filterSettings);
|
|
|
+
|
|
|
+ Map<String, String> perFieldAnalyzer = new HashMap<>();
|
|
|
+ perFieldAnalyzer.put("user", "keyword");
|
|
|
+ request.setPerFieldAnalyzer(perFieldAnalyzer);
|
|
|
+
|
|
|
+ request.setRealtime(false);
|
|
|
+ request.setRouting("routing");
|
|
|
+ TermVectorsResponse response =
|
|
|
+ client.termvectors(request, RequestOptions.DEFAULT);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void bulkApi() throws IOException {
|
|
|
+// BulkRequest request = new BulkRequest();
|
|
|
+// request.add(new IndexRequest(indexName, "_doc", "1")
|
|
|
+// .source(XContentType.JSON,"field", "foo"));
|
|
|
+// request.add(new IndexRequest(indexName, "_doc", "2")
|
|
|
+// .source(XContentType.JSON,"field", "bar"));
|
|
|
+// request.add(new IndexRequest(indexName, "_doc", "3")
|
|
|
+// .source(XContentType.JSON,"field", "baz"));
|
|
|
+
|
|
|
+ // 混合使用
|
|
|
+ BulkRequest request = new BulkRequest();
|
|
|
+// request.add(new DeleteRequest(indexName, "_doc", "3"));
|
|
|
+ request.add(new UpdateRequest(indexName, "_doc", "633b5f31-8cfd-47d4-bc9a-a151fb23633f")
|
|
|
+ .doc(XContentType.JSON, "user", "林俊杰1"));
|
|
|
+// request.add(new IndexRequest(indexName, "_doc", "4")
|
|
|
+// .source(XContentType.JSON,"field", "baz"));
|
|
|
+
|
|
|
+ BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
|
|
|
+ for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
|
|
+ DocWriteResponse itemResponse = bulkItemResponse.getResponse();
|
|
|
+
|
|
|
+ switch (bulkItemResponse.getOpType()) {
|
|
|
+ case INDEX:
|
|
|
+ case CREATE:
|
|
|
+ IndexResponse indexResponse = (IndexResponse) itemResponse;
|
|
|
+ break;
|
|
|
+ case UPDATE:
|
|
|
+ UpdateResponse updateResponse = (UpdateResponse) itemResponse;
|
|
|
+ logger.info(updateResponse.toString());
|
|
|
+ break;
|
|
|
+ case DELETE:
|
|
|
+ DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void multiGetAPI() throws IOException {
|
|
|
+ MultiGetRequest request = new MultiGetRequest();
|
|
|
+ String[] includes = new String[]{"user", "*1"};
|
|
|
+ String[] excludes = Strings.EMPTY_ARRAY;
|
|
|
+ FetchSourceContext fetchSourceContext =
|
|
|
+ new FetchSourceContext(true, includes, excludes);
|
|
|
+ request.add(new MultiGetRequest.Item(indexName, "_doc", "633b5f31-8cfd-47d4-bc9a-a151fb23633f")
|
|
|
+ .fetchSourceContext(fetchSourceContext));
|
|
|
+ MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
|
|
|
+ response.forEach((K) -> {
|
|
|
+ logger.info(K.toString());
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void reindexApi() {
|
|
|
+ ReindexRequest request = new ReindexRequest();
|
|
|
+ request.setSourceIndices("source1", "source2");
|
|
|
+ request.setDestIndex("dest");
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void updateByQueryAPI() {
|
|
|
+ UpdateByQueryRequest request =
|
|
|
+ new UpdateByQueryRequest("source1", "source2");
|
|
|
+
|
|
|
+
|
|
|
+ request.setConflicts("proceed");
|
|
|
+
|
|
|
+ request.setQuery(new TermQueryBuilder("user", "kimchy"));
|
|
|
+ request.setSize(10);
|
|
|
+
|
|
|
+ request.setPipeline("my_pipeline");
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private IndexRequest jsonData(String user, String message) {
|
|
|
+ // 创建一个普通的数据对象
|
|
|
+ IndexRequest request = new IndexRequest(
|
|
|
+ indexName,
|
|
|
+ "_doc",
|
|
|
+ UUID.randomUUID().toString());
|
|
|
+ String date = DateUtils.dateParseString(new Date());
|
|
|
+ String jsonString = "{" +
|
|
|
+ "\"user\":\"" + user + "\"," +
|
|
|
+ "\"postDate\":\"" + new Date().getTime() + "\"," +
|
|
|
+ "\"message\":\"" + message + "\"" +
|
|
|
+ "}";
|
|
|
+ IndexRequest source = request.source(jsonString, XContentType.JSON);
|
|
|
+ return source;
|
|
|
+ }
|
|
|
+
|
|
|
+ private IndexRequest mapData(String user, String message) {
|
|
|
+ // 构建一个Map对象
|
|
|
+ Map<String, Object> jsonMap = new HashMap<>();
|
|
|
+ jsonMap.put("user", user);
|
|
|
+ jsonMap.put("postDate", new Date());
|
|
|
+ jsonMap.put("message", message);
|
|
|
+ IndexRequest indexRequest = new IndexRequest(indexName, "_doc", UUID.randomUUID().toString())
|
|
|
+ .source(jsonMap);
|
|
|
+ return indexRequest;
|
|
|
+ }
|
|
|
+
|
|
|
+ private IndexRequest docData(String user, String message) throws IOException {
|
|
|
+ XContentBuilder builder = XContentFactory.jsonBuilder();
|
|
|
+ builder.startObject();
|
|
|
+ {
|
|
|
+ builder.field("user", user);
|
|
|
+ builder.timeField("postDate", new Date());
|
|
|
+ builder.field("message", message);
|
|
|
+ }
|
|
|
+ builder.endObject();
|
|
|
+ IndexRequest indexRequest = new IndexRequest(indexName, "_doc", UUID.randomUUID().toString())
|
|
|
+ .source(builder);
|
|
|
+ return indexRequest;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 用来处理搜索结果,转换成链表
|
|
|
+ *
|
|
|
+ * @param searchRequest
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public List<String> listSearchResult(SearchRequest searchRequest) {
|
|
|
+ // 获得response
|
|
|
+ SearchResponse searchResponse = null;
|
|
|
+ try {
|
|
|
+ searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ } finally {
|
|
|
+ if (client != null) {
|
|
|
+ try {
|
|
|
+ client.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 从response中获得结果
|
|
|
+ List<String> list = new LinkedList<>();
|
|
|
+ SearchHits hits = searchResponse.getHits();
|
|
|
+ Iterator<SearchHit> iterator = hits.iterator();
|
|
|
+ while (iterator.hasNext()) {
|
|
|
+ SearchHit next = iterator.next();
|
|
|
+ list.add(next.getSourceAsString());
|
|
|
+ }
|
|
|
+ return list;
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void after() throws IOException {
|
|
|
+ client.close();
|
|
|
+ }
|
|
|
+
|
|
|
+}
|