【简介】
ElasticSearch是一个基于Lucene的搜索服务器。实时分布式搜索和分析引擎。让你以前所未有的速度处理大数据成为可能。使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的 RESTful
API 来隐藏Lucene的复杂性,从而让全文搜索变得简单。
【特点】
1、分布式的实时文件存储,每个字段都被索引并可被搜索
2、分布式的实时分析搜索引擎
3、可以扩展到上百台服务器,处理PB级结构化或非结构化数据
4、面向文档(document)
针对特点4我们有必要看看关系型数据库与ES的结结构对比
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields
【请求方式】
VERB HTTP方法: GET , POST , PUT , HEAD , DELETE
1、PROTOCOL http或者https协议(只有在Elasticsearch前面有https代理的时候可用)
2、HOST Elasticsearch集群中的任何一个节点的主机名,如果是在本地的节点,那么就叫localhost
3、PORT Elasticsearch HTTP服务所在的端口,默认为9200
4、QUERY_STRING 一些可选的查询请求参数,例如 ?pretty 参数将使请求返回更加美观易读的JSON数据
5、BODY 一个JSON格式的请求主体(如果请求需要的话)
这里就具体安利ES了。。。
【需求场景】
由于项目中有模糊查询的业务,而且数据量特别大(单表8000W,并以每天50W的增量不断增加),不得不说这是前期架构设置留下来的大坑。我们知道,mysql进行模糊查询本来就慢,而且数据量还很大,查询效率自然就不敢恭维了,毋庸置疑,这个光荣的任务坑了LZ,集成ES做分表是必然的。
反正淌了不少坑... ...
【实现】
1:kibana视图语句
略
2:依赖(注意版本对应)
dependencies {
compile('org.springframework.boot:spring-boot-starter-data-elasticsearch')
compile('org.springframework.data:spring-data-elasticsearch')
compile('io.searchbox:jest:5.3.3')
compile('com.sun.jna:jna:3.0.9')
}
3:配置
#es
# 本地local环境
#spring.elasticsearch.jest.uris=http://192.168.90.201:9200
# 测试服务器环境
spring.elasticsearch.jest.uris=http://estest.data.autohome.com.cn:80
spring.elasticsearch.jest.read-timeout=10000
4:工具类
4.1、ESDoc 文档接口
public interface ESDoc {
//自定义索引文档ID,需要在实现类中添加一个文档id字段并使用@JestId注解,如果返回空则使用ES自动生成的文档ID
String getDocId();
//文档所属的索引名,一般为XX-XXX-yyyy.MM.dd
String getIndex();
//ES允许每个索引包含多个Type的文档
String getType();
}
4.2、ESHits.java
@Data
@NoArgsConstructor
public class ESHits {
private Integer total;
@SerializedName(value = "maxScore", alternate = "max_score")
private Double maxScore;
private List<Map<String, Object>> hits;
//get、set方法
}
4.3、ESShards.java
@Data
@NoArgsConstructor
public class ESShards {
private Integer total;
private Integer successful;
private Integer skipped;
private Integer failed;
private List<Map<String, Object>> failures;
//get、set方法
}
4.4、Pagination.java
@Data
public class Pagination<T> {
private int totalSize;
private List<T> list;//数据列表
private int pageIndex;
private int pageSize;
public Pagination(){}
public Pagination(int totalSize, List<T> list, int pageIndex, int pageSize) {
this.totalSize = totalSize;
this.list = list;
this.pageIndex = pageIndex;
this.pageSize = pageSize;
}
//get、set方法
}
4.5、ElasticSearchResult.java
@Data
@NoArgsConstructor
public class ElasticSearchResult {
private Integer took;
@SerializedName(value = "timeOut", alternate = {"time_out"})
private Boolean timeOut;
@SerializedName(value = "shards", alternate = {"_shards"})
private ESShards shards;
private ESHits hits;
//get、set方法
}
4.6、es业务实体类ESNlpInfoDoc.java
/**
* @Author: LvFang
* @Date: Created in 2018/4/26.
* @Description:
*/
public class ESNlpInfoDoc implements ESDoc {
@JestId
private String docId;
private String username;
private String title;
private String content;
private Long ctime;
public ESNlpInfoDoc(){};
public ESNlpInfoDoc(String docId,String username,String title,String content){
this.docId=docId;
this.username=username;
this.title=title;
this.content=content;
}
public ESNlpInfoDoc(String docId,String username,String title,String content,Long ctime){
this.docId=docId;
this.username=username;
this.title=title;
this.content=content;
this.ctime=ctime;
}
@Override
public String getDocId() {
return docId;
}
@Override
public String getIndex() {
//return String.format(ESConstants.INDEX_FORMAT, ESConstants.INDEX_TYPE_MOXIE, this.date);
return ESConstants.INDEX_NAME_NLP;
}
@Override
public String getType() {
return ESConstants.INDEX_TYPE_NLPINFO;
}
//get、set方法
}
4.7、ES静态常量ESConstants.java
/**
* @Author: LvFang
* @Date: Created in 2018/4/26.
* @Description:
*/
public class ESConstants {
public static String INDEX_FORMAT = "loan-%s-%s";//loan-debt-yyyy.MM.dd
public static String INDEX_NAME_NLP = "nlpindex_";//总索引
public static String INDEX_NAME_NLP_NEWLY = "nlpindex_newly";//最近一周内数据(实际对应info表,不包含his表)
public static String INDEX_NAME_NLP_ALL_CLM = "nlpindex";//总索引
public static String INDEX_TYPE_NLPINFO = "nlpinfo";
public static String INDEX_TYPE_NLPLOG = "nlplog";
public static String INDEX_TYPE_NLPDICT = "nlpdict";
public static String INDEX_TYPE_NLPUSER = "nlpuser";
public static Long BATCH_FAILURE_ID = -1L;//批次执行失败时失败文档ID,区分正常的文档失败
public static Integer INDEX_FAILURE = 1;//定时任务处理错误数据,补到索引中
}
4.8、jest工具类 JestService.java
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.cluster.Health;
import io.searchbox.cluster.NodesStats;
import io.searchbox.core.*;
import io.searchbox.core.search.sort.Sort;
import io.searchbox.indices.ClearCache;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.CreateIndex.Builder;
import io.searchbox.indices.IndicesExists;
import io.searchbox.indices.Optimize;
import io.searchbox.indices.mapping.PutMapping;
import io.searchbox.params.Parameters;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class JestService {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private JestClient jestClient;
private final static String SCROLL = "1m";
private final static int BATCH_SIZE = 5000;
private final static int MAX_SIZE = 10000;
/**
* 查看集群健康信息
* @throws Exception
*/
public JestResult health() throws IOException {
Health health = new Health.Builder().build();
JestResult result = jestClient.execute(health);
return result;
}
/**
* 节点状态
* @throws Exception
*/
public JestResult nodesStats() throws IOException {
NodesStats nodesStats = new NodesStats.Builder().build();
JestResult result = jestClient.execute(nodesStats);
return result;
}
/**
* 创建索引
* @param indexName
* @param settings
* @return 执行结果json
* @throws IOException
*/
public JestResult createIndex(String indexName, String settings) throws IOException {
Builder builder = new Builder(indexName);
if(StringUtils.isNotBlank(settings)){
builder.settings(Settings.builder().loadFromSource(settings));
}
CreateIndex createIndex = builder.build();
JestResult result = jestClient.execute(createIndex);
return result;
}
/**
* 创建索引映射
* @param indexName
* @param type
* @param mappings
* @return
* @throws IOException
*/
public JestResult putMapping(String indexName, String type, String mappings)
throws IOException {
PutMapping putMapping = new PutMapping.Builder(indexName, type, mappings).build();
JestResult result = jestClient.execute(putMapping);
return result;
}
/**
* 判断索引是否存在
* @param indexName
* @return 结果json
* @throws IOException
*/
public JestResult isExists(String indexName) throws IOException {
IndicesExists indicesExists = new IndicesExists.Builder(indexName).build();
JestResult result = jestClient.execute(indicesExists);
return result;
}
/**
* 优化索引
* @return
*/
public JestResult optimizeIndex() {
Optimize optimize = new Optimize.Builder().build();
JestResult result = null ;
try {
result = jestClient.execute(optimize);
} catch (IOException e) {
e.printStackTrace();
}
return result ;
}
/**
* 刷新缓存
* @return
*/
public JestResult clearCache() {
ClearCache closeIndex = new ClearCache.Builder().build();
JestResult result = null ;
try {
result = jestClient.execute(closeIndex);
} catch (IOException e) {
e.printStackTrace();
}
return result ;
}
/**
* 添加数据
* @param indexName
* @param type
* @param doc 要索引的文档
* @return 执行结果json
* @throws IOException
*/
public JestResult insertDocument(String indexName, String type, ESDoc doc) throws IOException {
Index index = new Index.Builder(doc).index(indexName).type(type).build();
JestResult result = jestClient.execute(index);
return result;
}
/**
* 批量索引
* @param indexName
* @param type
* @param docs
* @return
* @throws IOException
*/
public JestResult bulkIndex(String indexName, String type, List<ESNlpInfoDoc> docs)
throws IOException {
Bulk.Builder builder = new Bulk.Builder().defaultIndex(indexName).defaultType(type);
for(ESDoc doc: docs){
builder.addAction(new Index.Builder(doc).build());
}
JestResult result = jestClient.execute(builder.build());
return result;
}
/**
* 批量索引
* @param docs
* @return
* @throws IOException
*/
public JestResult bulkIndex(List<ESDoc> docs)
throws IOException {
Bulk.Builder builder = new Bulk.Builder();
for(ESDoc doc: docs){
builder.addAction(new Index.Builder(doc).index(doc.getIndex()).type(doc.getType()).build());
}
JestResult result = jestClient.execute(builder.build());
return result;
}
/**
* 删除文档
* @param indexName
* @param type
* @param id
* @return
* @throws Exception
*/
public JestResult deleteDocument(String indexName, String type, String id) throws Exception {
Delete delete = new Delete.Builder(id).index(indexName).type(type).build();
JestResult result = jestClient.execute(delete);
return result;
}
/**
* 更新文档内容
* @param indexName
* @param type
* @param doc
* @return
* @throws IOException
*/
public JestResult updateDocument(String indexName, String type, ESDoc doc)
throws IOException {
Update update = new Update.Builder(doc).index(indexName).type(type).id(doc.getDocId()).build();
JestResult result = jestClient.execute(update);
return result;
}
/**
* 获取指定文档
* @param indexName
* @param type
* @param docId
* @return
* @throws IOException
* 从返回结果获取对象
* Article article = result.getSourceAsObject(Article.class);
*/
public JestResult getDocument(String indexName, String type, String docId) throws IOException {
Get get = new Get.Builder(indexName, docId).type(type).build();
JestResult result = jestClient.execute(get);
return result;
}
/**
* 简单搜索
* @param indexName
* @param type
* @param query 查询json串
* @return
* @throws IOException
* 如何取出搜索结果
* List<SearchResult.Hit<Article, Void>> hits = result.getHits(Article.class);
* or
* List<Article> articles = result.getSourceAsObjectList(Article.class);
*/
public SearchResult simpleSearch(String indexName, String type, String query)
throws IOException {
Search search = new Search.Builder(query)
// multiple index or types can be added.
.addIndex(indexName)
.addType(type)
.build();
SearchResult result = jestClient.execute(search);
return result;
}
/**
* 检索所有
* @param indexName
* @param type
* @return
* @throws IOException
*/
public SearchResult searchAll ( String indexName , String type ) throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
Search search = new Search.Builder(
searchSourceBuilder.toString())
.addIndex(indexName)
.addType(type).build();
SearchResult result = jestClient.execute(search);
return result;
}
/**
* 检索所有并根据指定列进行排序(降序)
* @param indexName
* @param type
* @param sortField
* @return
* @throws IOException
*/
public SearchResult searchAllDesc ( String indexName , String type , String sortField ) throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
Search search = new Search.Builder(
searchSourceBuilder.toString()).addSort(new Sort(sortField,Sort.Sorting.DESC))
.addIndex(indexName)
.addType(type).build();
SearchResult result = jestClient.execute(search);
return result;
}
/**
* 检索所有并根据指定列进行排序(升序)
* @param indexName
* @param type
* @param sortField
* @return
* @throws IOException
*/
public SearchResult searchAllAsc ( String indexName , String type , String sortField ) throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
Search search = new Search.Builder(
searchSourceBuilder.toString()).addSort(new Sort(sortField,Sort.Sorting.ASC))
.addIndex(indexName)
.addType(type).build();
SearchResult result = jestClient.execute(search);
return result;
}
/**
* 检索所有
* @param indexName
* @param type
* @return
* @throws IOException
*/
public SearchResult searchMax ( String indexName , String type , String field ) throws IOException {
String query = "{\n" +
" \"size\": 0,\n" +
" \"aggs\": {\n" +
" \"maxCtime\": {\n" +
" \"max\": {\n" +
" \"field\": \"ctime\"\n" +
" }\n" +
" }\n" +
" }\n" +
" }";
Search search = new Search.Builder(query)
.addIndex(indexName)
.addType(type)
.build();
SearchResult result = jestClient.execute(search);
return result;
}
/**
* 单属性全文匹配
* @param field
* @param keyword
* @return
*/
public SearchResult searchInfoByField(String indexName , String type ,String field,Object keyword) throws Exception{
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// QueryBuilder queryBuilder = QueryBuilders.termQuery(field+".keyword", keyword);//单值完全匹配查询
QueryBuilder queryBuilder = QueryBuilders.termQuery(field, keyword);//单值完全匹配查询
searchSourceBuilder.query(queryBuilder).size(MAX_SIZE);
String query = searchSourceBuilder.toString();
System.out.println(query);
Search search = new Search.Builder(query)
.addIndex(indexName)
.addType(type)
.build();
SearchResult result = jestClient.execute(search);
return result;
}
/**
* 模糊检索(含时间过滤)
* @param indexName
* @param type
* @param keyWord
* @param field
* @return
* @throws IOException
*/
public SearchResult blurSearch ( String indexName , String type , String field , String keyWord,Long startTime,Long endTime) throws IOException {
//方式五:查询query(用API进行查询是对应视图工具上的json参数进行查询)
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//时间过滤
QueryBuilder timeQuery = QueryBuilders.rangeQuery("ctime").from(startTime).to(endTime);
//文本过滤
QueryBuilder contentBuilder = QueryBuilders.wildcardQuery(field, "*"+keyWord+"*");
QueryBuilder boolQuery = QueryBuilders.boolQuery().must(timeQuery).must(contentBuilder);
searchSourceBuilder.query(boolQuery).size(MAX_SIZE);
String query = searchSourceBuilder.toString();
System.out.println(query);
Search search = new Search.Builder(query)
.addIndex(indexName)
.addType(type)
.build();
SearchResult result = jestClient.execute(search);
return result;
}
/**
* 获取小于指定long的时间节点数据
* @param indexName
* @param type
* @return
* @throws IOException
*/
public SearchResult getlteByCtime ( String indexName , String type ,Long startTime) throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//时间过滤
QueryBuilder timeQuery = QueryBuilders.rangeQuery("ctime").lte(startTime);
QueryBuilder boolQuery = QueryBuilders.boolQuery().must(timeQuery);
searchSourceBuilder.query(boolQuery).size(MAX_SIZE);
String query = searchSourceBuilder.toString();
System.out.println(query);
Search search = new Search.Builder(query)
.addIndex(indexName)
.addType(type)
.build();
SearchResult result = jestClient.execute(search);
return result;
}
/**
* 获取大于指定long类型列节点数据
* @param indexName
* @param type
* @return
* @throws IOException
*/
public SearchResult getgteByField ( String indexName , String type ,String field,Long startTime) throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//时间过滤
QueryBuilder timeQuery = QueryBuilders.rangeQuery(field).gte(startTime);
QueryBuilder boolQuery = QueryBuilders.boolQuery().must(timeQuery);
searchSourceBuilder.query(boolQuery).size(MAX_SIZE);
String query = searchSourceBuilder.toString();
System.out.println(query);
Search search = new Search.Builder(query)
.addIndex(indexName)
.addType(type)
.build();
SearchResult result = jestClient.execute(search);
return result;
}
/**
* 获取小于等于指定long类型列节点数据
* @param indexName
* @param type
* @return
* @throws IOException
*/
public SearchResult getlteByField ( String indexName , String type , String field, Long startTime) throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//时间过滤
QueryBuilder timeQuery = QueryBuilders.rangeQuery(field).lte(startTime);
QueryBuilder boolQuery = QueryBuilders.boolQuery().must(timeQuery);
searchSourceBuilder.query(boolQuery).size(MAX_SIZE);
String query = searchSourceBuilder.toString();
System.out.println(query);
Search search = new Search.Builder(query)
.addIndex(indexName)
.addType(type)
.build();
SearchResult result = jestClient.execute(search);
return result;
}
/**
* 模糊检索(无时间过滤)
* @param indexName
* @param type
* @param keyWord
* @param field
* @return
* @throws IOException
*/
public SearchResult blurSearch ( String indexName , String type , String field , String keyWord) throws IOException {
//方式一
// QueryBuilder queryBuilder = QueryBuilders.fuzzyQuery(field+".keyword", keyWord);
// Search search = new Search.Builder(queryBuilder.toString()).addIndex(indexName).addType(type).build();
//方式二
// Term term=new Term(field+".keyword", "*"+keyWord+"*");
// WildcardQuery query=new WildcardQuery(term);
// Search search = new Search.Builder(query.toString()).addIndex(indexName).addType(type).build();
//方式三
// QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(field+".keyword", "*"+keyWord+"*"));
// Search search = new Search.Builder(
// queryBuilder.toString())
// .addIndex(indexName)
// .addType(type).build();
//方式四
// WildcardQueryBuilder queryBuilder = QueryBuilders.wildcardQuery(field+".keyword", "*"+keyWord+"*");
// Search search = new Search.Builder(queryBuilder.toString())
// .addIndex(indexName)
// .addType(type).build();
//方式五:查询query(用API进行查询是对应视图工具上的json参数进行查询)
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// QueryBuilder queryBuilder = QueryBuilders.wildcardQuery(field+".keyword", "*"+keyWord+"*");
QueryBuilder queryBuilder = QueryBuilders.wildcardQuery(field, "*"+keyWord+"*");
// QueryBuilder queryBuilder = QueryBuilders.matchQuery(field,keyWord);
searchSourceBuilder.query(queryBuilder).size(MAX_SIZE);
String query = searchSourceBuilder.toString();
System.out.println(query);
Search search = new Search.Builder(query)
.addIndex(indexName)
.addType(type)
.build();
SearchResult result = jestClient.execute(search);
return result;
}
/**
*
* @param indexName
* @param type
* @param query
* @param clazz
* @param <T>
* @return
* @throws IOException
*/
public <T extends ESDoc> List<T> scanAndScrollSearch(String indexName, String type, String query, Class<T> clazz)
throws IOException {
List<T> ret = new ArrayList<>();
Search search = new Search.Builder(query)
.addIndex(indexName)
.addType(type)
.setParameter(Parameters.SIZE, BATCH_SIZE)
.setParameter(Parameters.SCROLL, SCROLL)
.build();
SearchResult searchResult = jestClient.execute(search);
if(!searchResult.isSucceeded()){
logger.error(searchResult.getErrorMessage());
return ret;
}
String scrollId = searchResult.getJsonObject().get("_scroll_id").getAsString();
ElasticSearchResult esr = new Gson().fromJson(searchResult.getJsonString(), ElasticSearchResult.class);
logger.info("ES 搜索花费{}毫秒,超时:{},分片总数:{}, 成功执行的分片数:{},跳过的分片数:{},失败的分片数:{},命中数目:{},命中最高得分:{},本次取得{}条", esr.getTook(), esr.getTimeOut(), esr.getShards().getTotal(), esr.getShards().getSuccessful(), esr.getShards().getSkipped(), esr.getShards().getFailed(), esr.getHits().getTotal(), esr.getHits().getMaxScore(), esr.getHits().getHits().size());
List<T> curList = searchResult.getSourceAsObjectList(clazz, false);
int curPageSize = curList.size();
ret.addAll(curList);
while(curPageSize != 0 && ret.size() < MAX_SIZE) {
SearchScroll scrollSearch = new SearchScroll.Builder(scrollId, SCROLL).build();
JestResult scrollResult = jestClient.execute(scrollSearch);
scrollId = scrollResult.getJsonObject().get("_scroll_id").getAsString();
esr = new Gson().fromJson(scrollResult.getJsonString(), ElasticSearchResult.class);
logger.info("ES 搜索花费{}毫秒,超时:{},分片总数:{}, 成功执行的分片数:{},跳过的分片数:{},失败的分片数:{},命中数目:{},命中最高得分:{},本次取得{}条", esr.getTook(), esr.getTimeOut(), esr.getShards().getTotal(), esr.getShards().getSuccessful(), esr.getShards().getSkipped(), esr.getShards().getFailed(), esr.getHits().getTotal(), esr.getHits().getMaxScore(), esr.getHits().getHits().size());
curList = scrollResult.getSourceAsObjectList(clazz, false);
curPageSize = curList.size();
ret.addAll(curList);
}
return ret;
}
/**
*
* @param index
* @param type
* @param id
* @param clazz
* @param <T>
* @return
*/
public <T extends ESDoc> T getESDoc(String index, String type, String id, Class<T> clazz) {
try {
JestResult result = getDocument(index, type, id);
if (result.isSucceeded()) {
T doc = result.getSourceAsObject(clazz);
return doc;
}
} catch (IOException e) {
logger.error("从ES读取{}:{}:{}失败 {}[{}]", index, type, id, e.getMessage(), e.getStackTrace());
}
return null;
}
/**
* 搜索获得文档
* @param index
* @param type
* @param query
* @param clazz
* @return
*/
public <T extends ESDoc> Pagination<T> search(String index, String type, String query, Class<T> clazz){
Pagination<T> ret = null;
try {
SearchResult result = simpleSearch(index, type, query);
if(result.isSucceeded()) {
ElasticSearchResult esr = new Gson().fromJson(result.getJsonString(), ElasticSearchResult.class);
logger.info("ES 搜索花费{}毫秒,超时:{},分片总数:{}, 成功执行的分片数:{},跳过的分片数:{},失败的分片数:{},命中数目:{},命中最高得分:{}", esr.getTook(), esr.getTimeOut(), esr.getShards().getTotal(), esr.getShards().getSuccessful(), esr.getShards().getSkipped(), esr.getShards().getFailed(), esr.getHits().getTotal(), esr.getHits().getMaxScore());
if(esr.getShards().getFailed() == 0) {
ret = new Pagination<>();
ret.setList(result.getSourceAsObjectList(clazz, false));
ret.setTotalSize(esr.getHits().getTotal());
}else{
logger.error("搜索错误信息:{}", new Gson().toJson(esr.getShards().getFailures()));
}
}else{
logger.error("ES查询失败{}[{}]", result.getErrorMessage(), result.getJsonString());
}
} catch (IOException e) {
logger.error("搜索ES失败 {}[{}]", e.getMessage(), e.getStackTrace());
}
return ret;
}
public String generateQuery(){
SearchSourceBuilder builder = new SearchSourceBuilder();
TermQueryBuilder termQuery = QueryBuilders.termQuery("beat.hostname", "bjzw_99_138");
TermsQueryBuilder termsQuery = QueryBuilders
.termsQuery("response_code", "301", "302", "404", "500");
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("response_duration").gte(0).lte(1);
RangeQueryBuilder rangeQuery1 = QueryBuilders.rangeQuery("response_duration").gte(2).lte(3);
RangeQueryBuilder rangeQuery2 = QueryBuilders.rangeQuery("response_duration").gte(4).lte(5);
RangeQueryBuilder rangeQuery3 = QueryBuilders.rangeQuery("response_duration").gte(8).lte(10);
BoolQueryBuilder shouldQuery = QueryBuilders.boolQuery().should(rangeQuery1).should(rangeQuery2).should(rangeQuery3);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().filter(termQuery).filter(termsQuery).mustNot(rangeQuery).filter(shouldQuery);
return builder.query(boolQuery).toString();
}
}
5:业务实现
/**
* 根据content模糊查询ES
*
* @param content
* @param startTime
* @param endTime
* @return
*/
private List<Long> getESInfoByContent(String index , String type ,String content, Long startTime, Long endTime) {
List<Long> idList = new ArrayList<>();
try {
if (StringUtils.isNotBlank(content)) {
//进行ES模糊检索,并返回出id的集合列表
JestResult jestResult = jestService.blurSearch(index, type, "content", content, startTime, endTime);
List<ESNlpInfoDoc> list = jestResult.getSourceAsObjectList(ESNlpInfoDoc.class);
for (ESNlpInfoDoc doc : list) {
idList.add(Long.parseLong(doc.getDocId()));
}
}
} catch (Exception e) {
logger.info("ES query by content error : " + e.getMessage());
}
Collections.sort(idList);
Collections.reverse(idList);
return idList;
}
/**
* 根据title全文匹配ES
*
* @param title
* @param startTime
* @param endTime
* @return
*/
private List<Long> getESInfoByTitle(String index , String type ,String title, Long startTime, Long endTime) {
List<Long> idList = new ArrayList<>();
try {
if (StringUtils.isNotBlank(title)) {
//title全文匹配
// JestResult jestResult = jestService.searchInfoByField(ESConstants.INDEX_NAME_NLP, ESConstants.INDEX_TYPE_NLPINFO,"title",title);
//title模糊匹配
JestResult jestResult = jestService.blurSearch(index, type,"title", title, startTime, endTime);
List<ESNlpInfoDoc> list = jestResult.getSourceAsObjectList(ESNlpInfoDoc.class);
for (ESNlpInfoDoc doc : list) {
idList.add(Long.parseLong(doc.getDocId()));
}
}
} catch (Exception e) {
logger.info("ES query by title error : " + e.getMessage());
}
Collections.sort(idList);
Collections.reverse(idList);
return idList;
}
//-------------------------------------业务使用-------------------------------------
/**
* 二次优化:根据ES索引id,在根据mysql查询
*/
public RcASMsgVo infoRequestV2(Integer function,
String businessId,
String uName,
String curStatus,
String manualConfirm,
String contentType,
String stime,
String etime,
String type,
String searchName, //5 内容 6 标题
String searchContent,
int pageSize,
int pageNum) throws Exception {
RcASMsgVo rcASMsgVo = new RcASMsgVo(Constants.CODE_FAIL, Constants.MSG_FAIL, null);
//验证用户名和用户id一致性,如果验证失败,返回空内容
if (!checkAdmin(uName, businessId)) {
rcASMsgVo.setMsg(Constants.MSG_NAME_ID_NO_ALIKE);
return rcASMsgVo;
}
Long startTime = Long.parseLong(stime);
Long endTime = Long.parseLong(etime);
//时间转换格式
if (stime != null && etime != null) {
stime = timeTransform(stime);
etime = timeTransform(etime);
} else {
stime = null;
etime = null;
}
//根据日期判断查那张表(一周为基准)
String isHis = getIsHis(stime, etime);
//判断模糊查询参数 不为空则进行ES模糊查询
List<Long> idList;
//////////////////////////////////////////////////////////ES使用开始////////////////////////////////////////////////////////////
//(1)内容模糊检索
if (searchName.equals("5") || searchName == "5") {
//(优化v2)ES分表查询
if("0".equals(isHis)){
idList = getESInfoByContent(ESConstants.INDEX_NAME_NLP_NEWLY, ESConstants.INDEX_TYPE_NLPINFO,searchContent, startTime, endTime);
} else {
idList = getESInfoByContent(ESConstants.INDEX_NAME_NLP, ESConstants.INDEX_TYPE_NLPINFO,searchContent, startTime, endTime);
}
//(优化v1)ES查询
// idList = getESInfoByContent(ESConstants.INDEX_NAME_NLP, ESConstants.INDEX_TYPE_NLPINFO,searchContent, startTime, endTime);
System.out.println("ES检索后list大小:" + idList.size());
//(2)标题全文检索
} else if (searchName.equals("6") || searchName == "6") {
if("0".equals(isHis)){
idList = getESInfoByContent(ESConstants.INDEX_NAME_NLP_NEWLY, ESConstants.INDEX_TYPE_NLPINFO,searchContent, startTime, endTime);
} else {
idList = getESInfoByContent(ESConstants.INDEX_NAME_NLP, ESConstants.INDEX_TYPE_NLPINFO,searchContent, startTime, endTime);
}
System.out.println("ES检索后list大小:" + idList.size());
//(3)其他情况
} else {
//老查询方法
return infoRequest(function, businessId, uName, curStatus, manualConfirm, contentType, startTime.toString(), endTime.toString(), type, searchName, searchContent, pageSize, pageNum);
}
//////////////////////////////////////////////////////////ES使用结束////////////////////////////////////////////////////////////
//计算总数
int totalNum = 0;
//计算limit分页所需参数
int rowStartNum = pageSize * pageNum;
//关系型数据库查询,使用id排查
List<RcAdminNlpInfo> rcAdminNlpInfoList = new ArrayList<>();
if (idList.size() > 0) {
totalNum = rcAdminNlpInfoDao.countTotalNumNew(idList, businessId, curStatus, manualConfirm, contentType, stime, etime, type, isHis);
rcAdminNlpInfoList = rcAdminNlpInfoDao.selectByParamNew(idList, businessId, curStatus, manualConfirm, contentType, stime, etime, type, rowStartNum, pageSize, isHis);
}
logger.info("total num is : " + totalNum);
//使用字典缓存,将代码替换为文本,调用方法增加文本高亮标签
List<RcAdminNlpInfo> list = rcAdminNlpInfoFormat(rcAdminNlpInfoList);
//包装数据
RcASPageVo rcASPage = new RcASPageVo(totalNum, pageSize, pageNum, list);
rcASMsgVo.setCode(Constants.CODE_SUCCESS);
rcASMsgVo.setMsg(Constants.MSG_SUCCESS);
rcASMsgVo.setData(rcASPage);
// logger.info("查询结果:{}", rcASPage);
//初始化返回前端的数据对象
return rcASMsgVo;
}
我们可以看到,在查询中,老的查询方式是直接在mysql中进行模糊查询,整合ES之后,我们可以先使用ES进行模糊查询进行过滤,得到符合的结果id列表,在使用mysql对id进行in查询,从而避免使用mysql进行文本的模糊查询,提高效率。当然,我们也可以对ES进行分索引操作,对应mysql的分表。
【优化报表】当然,这不是最终的优化方案
查询时间段 查询类型 查询关键字 原始版本 优化V1(ES) 优化v2(ES分表) 优化v3(mysql去union做单表) 优化v4(去除mysql) 数据表跨度(张/数据量) 预估效率倍数(单位:倍)
当天 content 交警 用时:5079 ms 用时:8457 ms 用时:879 ms 用时:923 ms 1/500W 3
三天 content 交警 用时:23212 ms 用时:8359 ms 用时:942 ms 用时:1052 ms 1/500W 22
最近一周 content 交警 用时:42380 ms 用时:9055 ms 用时:1069 ms 用时:1063 ms 1/500W 39
一周以外 content 交警 用时:113900 ms 用时:9607 ms 用时:8508 ms 用时:8826 ms 1/7500W 13
最近一月 content 交警 用时:448306 ms 用时:214725 ms 用时:170833 ms 用时:44213 ms 2/8000W 10
当天 content 歪果仁 用时:31973 ms 用时:8053 ms 用时:659 ms 用时:665 ms 1/500W 48
三天 content 歪果仁 用时:25086 ms 用时:8587 ms 用时:652 ms 用时:753 ms 1/500W 33
最近一周 content 歪果仁 用时:49043 ms 用时:7883 ms 用时:759 ms 用时:704 ms 1/500W 69
一周以外 content 歪果仁 用时:116818 ms 用时:8156 ms 用时:7999 ms 用时:8613 m 1/7500W 14
最近一月 content 歪果仁 用时:456657 ms 用时:8579 ms 用时:8223 ms 用时:8113 ms 2/8000W 56
当天 title 提车 用时:4280 ms 用时:1408 ms 用时:1021 ms 用时:1139 ms 1/500W 4
三天 title 提车 用时:11150 ms 用时:3869 ms 用时:3417 ms 用时:3190 ms 1/500W 3
最近一周 title 提车 用时:18370 ms 用时:3921 ms 用时:3500 ms 用时:3288 ms 1/500W 5
一周以外 title 提车 用时:84029 ms 用时:42576 ms 用时:41153 ms 用时:40660 ms 1/7500W 2
最近一月 title 提车 用时:263730 ms 用时:177561 ms 用时:173627 ms 用时:42564 ms 2/8000W 6
当天 title 保养 用时:4503 ms 用时:966 ms 用时:912 ms 用时:975 ms 1/500W 5
三天 title 保养 用时:10832 ms 用时:3336 ms 用时:3142 ms 用时:3094 ms 1/500W 3
最近一周 title 保养 用时:19063 ms 用时:3813 ms 用时:3642 ms 用时:3169 ms 1/500W 7
一周以外 title 保养 用时:79402 ms 用时:41638 ms 用时:40708 ms 用时:40718 ms 1/7500W 2
最近一月 title 保养 用时:265663 ms 用时:173113 ms 用时:179834 ms 用时:44493 ms 2/8000W 6
方案演变:
原始版本-优化v1(ES接入)
旨在使用ES的代替mysql模糊查询,降低模糊查询时间,提高效率。此段优化ES单个查询消耗时间在10s-20s之间,所有通常在当天或者近期的查询时间段性能不会提升,反而会有所下降,主要原因是ES的查询消耗
优化v1-优化v2(ES分表)
旨在避免每一次查询ES都对整体数据量进行模糊匹配,将搜索频次比较高的时间段新建一个索引,提高热度高频时间段查询的效率(以一周为准),此段优化在查询一周内效率提升比较明显
优化v2-优化v3(mysql单表处理)
旨在去除mysql的union链表查询,转单表查询,对结果集进行程序整合操作来替换mysql查询,使用程序弥补mysql的查询瓶颈,查询效率,此段优化在查询一周以外以及横跨双表时效率提升比较明显
优化v3-优化v4(去除mysql)
待续... ...
备注:
目前,查询瓶颈主要在分表问题上,mysql通常单表数据量不易超过500W,当前业务需求看,mysql存储数据量过大是造成查询效率低下的主要原因之一,可转mysql为HBase或ES
上张图