冀教网 - 河北教师网站 - 专注于冀教版课本资源

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 2|回复: 0

Elasticsearch系列---Java客户端代码Demo

[复制链接]

4万

主题

4万

帖子

12万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
122240
发表于 2020-5-23 21:27 | 显示全部楼层 |阅读模式
前言

前面历经33篇内容的讲解,与ES的请求操作都是在Kibana平台上用Restful请求完成的,一直没发布Java或python的客户端代码,Restful才是运用、理解ES核心功能最直接的表达方式,但实际项目中肯定是以Java/python来完成ES请求的发起与数据处理的,前面理解了ES的核心功能,后面Java API的使用将会非常简单,剩余的未覆盖的功能API,自行查阅文档即可。
概要

本篇讲解Elasticsearch的客户端API开发的一些示例,以Java语言为主,介绍一些最常用,最核心的API。
代码示例

引入依赖

我们以maven项目为例,添加项目依赖
  1.         org.elasticsearch        elasticsearch        6.3.1        org.elasticsearch.client        transport        6.3.1        log4j        log4j        1.2.17        org.apache.logging.log4j        log4j-core        2.12.1
复制代码
建立ES连接


  • 创建Settings对象,指定集群名称
  • 创建TransportClient对象,手动指定IP、端口即可
  1. Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();                TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
复制代码
如果集群的节点数比较多,为每个node分别指定IP、Port可行性不高,我们可以使用集群节点自动探查的功能,代码如下:
  1. // 将client.transport.sniff设置为true即可打开集群节点自动探查功能Settings settings = Settings.builder().put("client.transport.sniff", true)..put("cluster.name", "elasticsearch").build();// 只需要指定一个node就行TransportClient client = new PreBuiltTransportClient(settings);transport.addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.17.137"), 9300));
复制代码
基本CRUD

最基本的CRUD代码,可以当作入门demo来写:
  1. /**         * 创建员工信息(创建一个document)         * @param client         */        private static void createEmployee(TransportClient client) throws Exception {                IndexResponse response = client.prepareIndex("company", "employee", "1")                                .setSource(XContentFactory.jsonBuilder()                                                .startObject()                                                        .field("name", "jack")                                                        .field("age", 27)                                                        .field("position", "technique")                                                        .field("country", "china")                                                        .field("join_date", "2017-01-01")                                                        .field("salary", 10000)                                                .endObject())                                .get();                System.out.println(response.getResult());         }                /**         * 获取员工信息         * @param client         * @throws Exception         */        private static void getEmployee(TransportClient client) throws Exception {                GetResponse response = client.prepareGet("company", "employee", "1").get();                System.out.println(response.getSourceAsString());         }                /**         * 修改员工信息         * @param client         * @throws Exception         */        private static void updateEmployee(TransportClient client) throws Exception {                UpdateResponse response = client.prepareUpdate("company", "employee", "1")                                 .setDoc(XContentFactory.jsonBuilder()                                                        .startObject()                                                                .field("position", "technique manager")                                                        .endObject())                                .get();                System.out.println(response.getResult());           }                /**         * 删除 员工信息         * @param client         * @throws Exception         */        private static void deleteEmployee(TransportClient client) throws Exception {                DeleteResponse response = client.prepareDelete("company", "employee", "1").get();                System.out.println(response.getResult());          }
复制代码
搜索

我们之前使用Restful的搜索,现在改用java实现,原有的Restful示例如下:
  1. GET /company/employee/_search{  "query": {    "bool": {      "must": [        {          "match": {            "position": "technique"          }        }      ],      "filter": {        "range": {          "age": {            "gte": 30,            "lte": 40          }        }      }    }  },  "from": 0,  "size": 1}
复制代码
等同于这样的Java代码:
  1. SearchResponse response = client.prepareSearch("company")        .setTypes("employee")        .setQuery(QueryBuilders.termQuery("position", "technique"))                 // Query        .setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40))     // Filter        .setFrom(0).setSize(60)        .get();
复制代码
聚合查询

聚合查询稍微麻烦一些,请求的封装和响应报文的解析,都是根据实际返回的结构来做的,例如下面的查询:
需求:

  • 按照country国家来进行分组
  • 在每个country分组内,再按照入职年限进行分组
  • 最后计算每个分组内的平均薪资
Restful的请求如下:
  1. GET /company/employee/_search{  "size": 0,  "aggs": {    "group_by_country": {      "terms": {        "field": "country"      },      "aggs": {        "group_by_join_date": {          "date_histogram": {            "field": "join_date",            "interval": "year"          },          "aggs": {            "avg_salary": {              "avg": {                "field": "salary"              }            }          }        }      }    }  }}
复制代码
用Java编写的请求如下:
  1. SearchResponse sr = node.client().prepareSearch()    .addAggregation(        AggregationBuilders.terms("by_country").field("country")        .subAggregation(AggregationBuilders.dateHistogram("by_year")            .field("dateOfBirth")            .dateHistogramInterval(DateHistogramInterval.YEAR)            .subAggregation(AggregationBuilders.avg("avg_children").field("children"))        )    )    .execute().actionGet();
复制代码
对响应的处理,则需要一层一层获取数据:
  1. Map aggrMap = searchResponse.getAggregations().asMap();        StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");        Iterator groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();                while(groupByCountryBucketIterator.hasNext()) {                Bucket groupByCountryBucket = groupByCountryBucketIterator.next();                                System.out.println(groupByCountryBucket.getKey() + "\t" + groupByCountryBucket.getDocCount());                                 Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date");                 Iterator groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();                                 while(groupByJoinDateBucketIterator.hasNext()) {                        org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();                                                System.out.println(groupByJoinDateBucket.getKey() + "\t" + groupByJoinDateBucket.getDocCount());                                                 Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");                        System.out.println(avgSalary.getValue());                 }        }                client.close();}
复制代码
upsert请求
  1. private static void upsert(TransportClient transport) {        try {                IndexRequest index = new IndexRequest("book_shop", "books", "2").source(                                XContentFactory.jsonBuilder().startObject()                                                .field("name", "mysql从入门到删库跑路")                                                .field("tags", "mysql")                                                .field("price", 32.8)                                                .endObject());                UpdateRequest update = new UpdateRequest("book_shop", "books", "2")                                .doc(XContentFactory.jsonBuilder()                                                .startObject().field("price", 31.8)                                                .endObject())                                .upsert(index);                UpdateResponse response = transport.update(update).get();                System.out.println(response.getVersion());        } catch (IOException e) {                e.printStackTrace();        } catch (InterruptedException e) {                e.printStackTrace();        } catch (ExecutionException e) {                e.printStackTrace();        }}
复制代码
mget请求
  1. public static void mget(TransportClient transport) {        MultiGetResponse res = transport.prepareMultiGet()                        .add("book_shop", "books", "1")                        .add("book_shop", "books", "2")                        .get();        for (MultiGetItemResponse item : res.getResponses()) {                System.out.println(item.getResponse());        }}
复制代码
bulk请求
  1. public static void bulk(TransportClient transport) {        try {        BulkRequestBuilder bulk = transport.prepareBulk();        bulk.add(transport.prepareIndex("book_shop", "books", "3").setSource(                        XContentFactory.jsonBuilder().startObject()                                        .field("name", "设计模式从入门到拷贝代码")                                        .field("tags", "设计模式")                                        .field("price", 55.9)                                        .endObject()));                bulk.add(transport.prepareIndex("book_shop", "books", "4").setSource(                                XContentFactory.jsonBuilder().startObject()                                                .field("name", "架构设计从入门到google搜索")                                                .field("tags", "架构设计")                                                .field("price", 68.9)                                                .endObject()));                bulk.add(transport.prepareUpdate("book_shop", "books", "1").setDoc((XContentFactory.jsonBuilder()                                .startObject().field("price", 32.8)                                .endObject())));                BulkResponse bulkRes = bulk.get();                if (bulkRes.hasFailures()) {                        System.out.println("Error...");                }        } catch (IOException e) {                e.printStackTrace();        }}
复制代码
scorll请求
  1. public static void scorll(TransportClient client) {        SearchResponse bookShop = client.prepareSearch("book_shop").setScroll(new TimeValue(60000)).setSize(1).get();        int batchCnt = 0;        do {            // 循环读取scrollid信息,直到结果为空                for(SearchHit hit: bookShop.getHits().getHits()) {                        System.out.println("batchCnt:" + ++batchCnt);                        System.out.println(hit.getSourceAsString());                }                bookShop = client.prepareSearchScroll(bookShop.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();        } while (bookShop.getHits().getHits().length != 0);}
复制代码
搜索模板
  1. public static void searchTemplates(TransportClient client) {        Map params = new HashMap(10);        params.put("from",0);        params.put("size",10);        params.put("tags","java");        SearchTemplateResponse str = new SearchTemplateRequestBuilder(client)                        .setScript("page_query_by_tags")                        .setScriptType(ScriptType.STORED)                        .setScriptParams(params)                        .setRequest(new SearchRequest())                        .get();        for(SearchHit hit:str.getResponse().getHits().getHits()) {                System.out.println(hit.getSourceAsString());        }}
复制代码
多条件组合查询
  1. public static void otherSearch(TransportClient client) {        SearchResponse response1 = client.prepareSearch("book_shop").setQuery(QueryBuilders.termQuery("tags", "java")).get();        SearchResponse response2 = client.prepareSearch("book_shop").setQuery(QueryBuilders.multiMatchQuery("32.8", "price","tags")).get();        SearchResponse response3 = client.prepareSearch("book_shop").setQuery(QueryBuilders.commonTermsQuery("name", "入门")).get();        SearchResponse response4 = client.prepareSearch("book_shop").setQuery(QueryBuilders.prefixQuery("name", "java")).get();        System.out.println(response1.getHits().getHits()[0].getSourceAsString());        System.out.println(response2.getHits().getHits()[0].getSourceAsString());        System.out.println(response3.getHits().getHits()[0].getSourceAsString());        System.out.println(response4.getHits().getHits()[0].getSourceAsString());        // 多个条件组合        SearchResponse response5 = client.prepareSearch("book_shop").setQuery(QueryBuilders.boolQuery()                        .must(QueryBuilders.termQuery("tags", "java"))                        .mustNot(QueryBuilders.matchQuery("name", "跑路"))                        .should(QueryBuilders.matchQuery("name", "入门"))                        .filter(QueryBuilders.rangeQuery("price").gte(23).lte(55))).get();        System.out.println(response5.getHits().getHits()[0].getSourceAsString());}
复制代码
地理位置查询
  1. public static void geo(TransportClient client) {        GeoBoundingBoxQueryBuilder query1 = QueryBuilders.geoBoundingBoxQuery("location").setCorners(23, 112, 21, 114);        List points = new ArrayList();        points.add(new GeoPoint(23,115));        points.add(new GeoPoint(25,113));        points.add(new GeoPoint(21,112));        GeoPolygonQueryBuilder query2 = QueryBuilders.geoPolygonQuery("location",points);        GeoDistanceQueryBuilder query3 = QueryBuilders.geoDistanceQuery("location").point(22.523375, 113.911231).distance(500, DistanceUnit.METERS);        SearchResponse response = client.prepareSearch("location").setQuery(query3).get();        for(SearchHit hit:response.getHits().getHits()) {                System.out.println(hit.getSourceAsString());        }}
复制代码
小结

上述的那些案例demo,快速浏览一下即可,如果已经在开发ES相关的项目,还是多参考官方的API文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.3/index.html。上面有很详尽的API说明和使用Demo
专注Java高并发、分布式架构,更多技术干货分享与心得,请关注公众号:Java架构社区
可以扫左边二维码添加好友,邀请你加入Java架构社区微信群共同探讨技术
我的关键词 Elasticsearch系列---Java客户端代码Demo  新闻资讯 1834889-20200303074927076-1724862603


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|冀教网 - 河北教师网站 - 专注于冀教版课本资源  

GMT+8, 2020-5-30 04:41 , Processed in 0.210406 second(s), 24 queries .

Powered by Discuz! X3.2

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表