Spring Boot + Elasticsearch 快速整合指南
引言
Elasticsearch作為高性能的分布式搜索引擎,在現(xiàn)代應(yīng)用開(kāi)發(fā)中被廣泛使用。具有如下特點(diǎn):
- 一個(gè)分布式的實(shí)時(shí)文檔存儲(chǔ)引擎,每個(gè)字段都可以被索引與搜索。
 - 一個(gè)分布式實(shí)時(shí)分析搜索引擎,支持各種查詢和聚合操作。
 - 能勝任上百個(gè)服務(wù)節(jié)點(diǎn)的擴(kuò)展,并可以支持PB級(jí)別的結(jié)構(gòu)化或者非結(jié)構(gòu)化數(shù)據(jù)。
 
倒排索引
倒排索引是整個(gè)Elasticsearch的核心,正常的搜索以一本書(shū)為例,應(yīng)該是由目錄 -> 章節(jié) -> 頁(yè)碼 -> 內(nèi)容這樣的查找順序,這樣是正排索引的思想。
但是設(shè)想一下,我在一本書(shū)中快速查找elasticsearch這個(gè)關(guān)鍵字所在的頁(yè)面該怎么辦?
倒排索引的思路是通過(guò)單詞到文檔ID的關(guān)系對(duì)應(yīng)。
圖片
本文將詳細(xì)介紹通過(guò)ElasticsearchRepository和ElasticsearchRestTemplate兩種方式實(shí)現(xiàn)整合的方法。
案例
使用 ElasticsearchRepository
ElasticsearchRepository是Spring Data提供的接口,通過(guò)繼承該接口,可快速實(shí)現(xiàn)基本的CRUD操作,極大地簡(jiǎn)化了開(kāi)發(fā)流程。
1. 創(chuàng)建Repository接口:繼承ElasticsearchRepository,并指定實(shí)體類和主鍵類型,還可自定義查詢方法。
public interface DemoRepository extends ElasticsearchRepository<Demo, String> {
    // 自定義查詢方法
    List<Demo> findByImsi(String imsi);
    // 使用@Query注解定義DSL查詢
    @Query("{\"bool\": {\"must\": [{\"match\": {\"imsi\": \"?0\"}}], \"filter\": {\"range\": {\"costTime\": {\"gte\": ?1, \"lte\": ?2}}}}}")
    List<Demo> findByImsiAndPriceRange(String imsi, double min, double max);
}2. 服務(wù)層實(shí)現(xiàn):在服務(wù)類中注入Repository,調(diào)用其方法完成數(shù)據(jù)操作。
@Service
public class DemoService {
    @Autowired
    private DemoRepository demoRepository;
    public Demo save(Demo demo) {
        return demoRepository.save(demo);
    }
    public Optional<Demo> findById(String id) {
        return demoRepository.findById(id);
    }
    public List<Demo> findByName(String imsi) {
        return demoRepository.findByImsi(imsi);
    }
    public Iterable<Demo> findAll() {
        return demoRepository.findAll();
    }
    public void delete(Demo demo) {
        demoRepository.delete(demo);
    }
    public List<Demo> findByImsiAndPriceRange(String imsi, double min, double max) {
        return demoRepository.findByImsiAndPriceRange(imsi, min, max);
    }
}使用 ElasticsearchRestTemplate
1. 配置ElasticsearchRestTemplate
@Configuration
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
    @Value("${spring.elasticsearch.uris: localhost:9200}")
    private String[] uris;
    @Bean(name = { "elasticsearchOperations", "elasticsearchRestTemplate" })
    public ElasticsearchRestTemplate elasticsearchTemplate() {
        return new ElasticsearchRestTemplate(elasticsearchClient());
    }
    @Override
    public RestHighLevelClient elasticsearchClient() {
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("username", "password"));
        HttpHost[] httpHosts = Arrays.stream(uris).map(HttpHost::create).toArray(HttpHost[]::new);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts)
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
        return new RestHighLevelClient(restClientBuilder);
    }
}2. 服務(wù)層實(shí)現(xiàn):在服務(wù)類中注入ElasticsearchRestTemplate,通過(guò)構(gòu)建查詢條件實(shí)現(xiàn)各種數(shù)據(jù)操作。
@Service
public class DslQueryService {
    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;
    // 1. 基本Match查詢
    public List<Demo> searchByKeyword(String keyword) {
        NativeSearchQuery query = new NativeSearchQueryBuilder()
                .withQuery(QueryBuilders.matchQuery("imsi", keyword))
                .build();
        SearchHits<Demo> searchHits = elasticsearchRestTemplate.search(query, Demo.class);
        return searchHits.getSearchHits().stream()
                .map(SearchHit::getContent)
                .collect(Collectors.toList());
    }
    // 2. 組合Bool查詢
    public List<Demo> complexSearch(String imsi, Double min, Double max, String desc) {
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        if (imsi != null &&!imsi.isEmpty()) {
            boolQuery.must(QueryBuilders.matchQuery("imsi", imsi));
        }
        if (min != null && max != null) {
            boolQuery.filter(QueryBuilders.rangeQuery("costTime").gte(min).lte(max));
        }
        if (desc != null &&!desc.isEmpty()) {
            boolQuery.filter(QueryBuilders.termQuery("desc", desc));
        }
        NativeSearchQuery query = new NativeSearchQueryBuilder()
                .withQuery(boolQuery)
                .withPageable(PageRequest.of(0, 20))
                .build();
        SearchHits<Demo> searchHits = elasticsearchRestTemplate.search(query, Demo.class);
        return searchHits.getSearchHits().stream()
                .map(SearchHit::getContent)
                .collect(Collectors.toList());
    }
    // 3. 聚合查詢示例
    public void getCategoryCounts() {
        SearchRequest searchRequest = new SearchRequest("demo");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        boolQueryBuilder.must(QueryBuilders.termsQuery("imsi", "test","000","1"));
        searchSourceBuilder.query(boolQueryBuilder);
        searchSourceBuilder.size(0);
        searchSourceBuilder.trackTotalHits(true);
        Script scriptGroup = new Script("doc['imsi'].value");
        TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("by_imsi").script(scriptGroup).size(10);
        termsAggregationBuilder.subAggregation(AggregationBuilders.sum("sumTime").field("costTime"));
//        Map<String, String> bucketsPathsMap = new HashMap<>();
//        bucketsPathsMap.put("sumTime", "sumTime");
//        BucketSelectorPipelineAggregationBuilder selectorPipelineAggregationBuilder = PipelineAggregatorBuilders
//                .bucketSelector("having_count", bucketsPathsMap, new Script("params.sumTime<10000"));
//        termsAggregationBuilder.subAggregation(selectorPipelineAggregationBuilder);
        TopHitsAggregationBuilder topHit = new TopHitsAggregationBuilder("top_result").size(10);
        termsAggregationBuilder.subAggregation(topHit);
        searchSourceBuilder.aggregation(termsAggregationBuilder);
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = elasticsearchRestTemplate.execute(client  -> {
            return client.search(searchRequest, RequestOptions.DEFAULT);
        });
        Terms terms = (Terms) searchResponse.getAggregations().get("by_imsi");
        for(Terms.Bucket bucket : terms.getBuckets()) {
            Aggregations aggregations = bucket.getAggregations();
            Sum sum = aggregations.get("sumTime");
            System.out.println(bucket.getKeyAsString()+":"+bucket.getDocCount()+":"+sum.getValueAsString());
        }
    }
    // 4. 滾動(dòng)查詢示例
    public List<Demo> scrollSearch(String scrollId, int pageSize) {
        SearchScrollHits<Demo> searchScrollHits;
        if (scrollId == null) {
            NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
                    .withQuery(QueryBuilders.matchAllQuery())
                    .withPageable(PageRequest.of(0, pageSize))
                    .build();
            searchScrollHits = elasticsearchRestTemplate.searchScrollStart(30000L, searchQuery, Demo.class, IndexCoordinates.of("demo"));
        } else {
            searchScrollHits = elasticsearchRestTemplate.searchScrollContinue(scrollId, 30000L, Demo.class, IndexCoordinates.of("demo"));
        }
        elasticsearchRestTemplate.searchScrollClear(Collections.singletonList(searchScrollHits.getScrollId()));
        return searchScrollHits.getSearchHits().stream()
                .map(SearchHit::getContent)
                .collect(Collectors.toList());
    }
}測(cè)試方法
@Slf4j
@SpringBootTest
public class TestDemo {
    @Autowired
    private DemoService demoService;
    @Autowired
    private DslQueryService dslQueryService;
    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;
    @Test
    public void test1(){
        demoService.findById("vkwztJMBXiMbcxs-8Npt").ifPresent(demo -> log.info(demo.toString()));
    }
    @Test
    public void test2(){
        demoService.findByImsiAndPriceRange("test", 0.0, 50.0).forEach(demo -> log.info(demo.toString()));
    }
    @Test
    public void test3(){
        dslQueryService.searchByKeyword("test").forEach(demo -> log.info(demo.toString()));
    }
    @Test
    public void test4(){
        dslQueryService.getCategoryCounts();
    }
    @Test
    public void test5(){
        dslQueryService.scrollSearch(null, 10).forEach(demo -> log.info(demo.toString()));
    }
    @Test
    public void test6(){
        Boolean flag = elasticsearchRestTemplate.indexOps(Demo.class).exists();
        if (flag == false) {
            log.info(" createIndex.......");
            elasticsearchRestTemplate.indexOps(Demo.class).create();
            elasticsearchRestTemplate.indexOps(Demo.class).putMapping(Demo.class);
        } else {
            String indexName = elasticsearchRestTemplate.getIndexCoordinatesFor(Demo.class).getIndexName();
            log.info(" refreshIndex......");
            refreshAsync(indexName);
        }
    }
    @Test
    public void test7(){
        List list = new ArrayList();
        Demo bean = new Demo("test", "test", "test", "test", "test", 1L);
        IndexQuery indexQuery = new IndexQueryBuilder().withSource(JSONObject.toJSONString(bean)).build();
        list.add(indexQuery);
        elasticsearchRestTemplate.bulkIndex(list, Demo.class);
    }
    public void refreshAsync(String index) {
        try {
            elasticsearchRestTemplate.execute(client -> client.indices().refreshAsync(refreshRequest(index), RequestOptions.DEFAULT, new ActionListener<RefreshResponse>() {
                @Override
                public void onResponse(RefreshResponse refreshResponse) {
                }
                @Override
                public void onFailure(Exception e) {
                    log.info("failed callback to refresh index={},exception--->{}" + index, e);
                }
            }));
        } catch (Exception e) {
            log.info("failed to refresh index={},exception--->{}" + index, e);
        }
    }
}復(fù)制聚合場(chǎng)景
- 使用嵌套的terms聚合實(shí)現(xiàn)三級(jí)分組:時(shí)間、域和 IMSI
 - 對(duì)每個(gè)分組添加計(jì)數(shù)聚合,計(jì)算總數(shù)和失敗數(shù)
 - 使用filter聚合篩選失敗記錄(resulCode 不為 "0000")
 
// 構(gòu)建基礎(chǔ)查詢條件
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
        // 時(shí)間范圍條件
        String startTime = jsonParam.getString("startTime");
        String endTime = jsonParam.getString("endTime");
        if (!StringUtils.isEmpty(startTime) && !StringUtils.isEmpty(endTime)) {
            startTime = startTime + ":00";
            endTime = endTime + ":59";
            boolQueryBuilder.must(QueryBuilders.rangeQuery("rtime").gte(startTime).lte(endTime));
        }
        // 數(shù)據(jù)域權(quán)限條件
        List<String> vpndomains = CommonTools.strList(perms);
        if (CollectionUtils.isNotEmpty(vpndomains)) {
            boolQueryBuilder.must(QueryBuilders.termsQuery("vpdndomain", vpndomains));
        }
        // 數(shù)據(jù)源類型條件
        if (!StringUtils.isEmpty(publicPerms) && publicPerms.contains("oldora")) {
            boolQueryBuilder.must(QueryBuilders.matchQuery("sourceType", 4)); 
        } else {
            boolQueryBuilder.must(QueryBuilders.matchQuery("sourceType", 1)); 
        }
        // 構(gòu)建聚合查詢
        NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder)
                // 使用date_histogram聚合按分鐘分組
                .addAggregation(
                        AggregationBuilders.dateHistogram("by_minute")
                                .field("rtime")
                                .fixedInterval(DateHistogramInterval.MINUTE)
                                .format("yyyy-MM-dd HH:mm")
                                .subAggregation(
                                        AggregationBuilders.terms("by_domain")
                                                .field("vpdndomain")
                                                .subAggregation(
                                                        AggregationBuilders.terms("by_imsi")
                                                                .field("imsi")
                                                                .subAggregation(
                                                                        // 統(tǒng)計(jì)總數(shù)
                                                                        AggregationBuilders.count("total_count").field("_index")
                                                                )
                                                                .subAggregation(
                                                                        // 統(tǒng)計(jì)失敗數(shù)
                                                                        AggregationBuilders.filter("fail_count",QueryBuilders.boolQuery()
                                                                                .mustNot(QueryBuilders.termQuery("resulCode", "0000")))
                                                                )
                                                )
                                )
                )
                .build();
        // 執(zhí)行查詢
        SearchHits<Authlog> searchHits = elasticsearchRestTemplate.search(searchQuery, Authlog.class, IndexCoordinates.of(authlog_index_name));
        // 處理聚合結(jié)果
        List<Map> statsList = new ArrayList<>();
        Integer overLimitCount = jsonParam.getInteger("overLimitCount");
        Histogram timeTerms = searchHits.getAggregations().get("by_minute");
        if (timeTerms != null) {
            for (Histogram.Bucket timeBucket : timeTerms.getBuckets()) {
                String rtime = timeBucket.getKeyAsString();
                Terms domainTerms = timeBucket.getAggregations().get("by_domain");
                for (Terms.Bucket domainBucket : domainTerms.getBuckets()) {
                    String vpdndomain = domainBucket.getKeyAsString();
                    Terms imsiTerms = domainBucket.getAggregations().get("by_imsi");
                    for (Terms.Bucket imsiBucket : imsiTerms.getBuckets()) {
                        String imsi = imsiBucket.getKeyAsString();
                        // 獲取總數(shù)
                        ValueCount totalCount = imsiBucket.getAggregations().get("total_count");
                        long total = totalCount.getValue();
                        // 跳過(guò)不滿足閾值的記錄
                        if (total < overLimitCount) continue;
                        // 獲取失敗數(shù)
                        Filter failCount = imsiBucket.getAggregations().get("fail_count");
                        long fail = failCount.getDocCount();
                        // 構(gòu)建結(jié)果
                        Map<Object, Object> result = MapUtil.builder()
                                .put("rtime", rtime)
                                .put("vpdndomain", vpdndomain)
                                .put("imsi", imsi)
                                .put("total", total)
                                .put("fail", fail)
                                .map();
                        statsList.add(result);
                    }
                }
            }
        }在實(shí)際項(xiàng)目中,可根據(jù)需求靈活選擇:
- 對(duì)于簡(jiǎn)單的CRUD操作和基礎(chǔ)查詢,優(yōu)先選擇ElasticsearchRepository,其簡(jiǎn)潔的代碼結(jié)構(gòu)能快速完成開(kāi)發(fā)。
 - 若涉及復(fù)雜的查詢邏輯、聚合分析或自定義操作,ElasticsearchRestTemplate更能滿足需求,開(kāi)發(fā)者可通過(guò)構(gòu)建DSL實(shí)現(xiàn)強(qiáng)大的搜索功能。
 















 
 
 














 
 
 
 