Elasticsearch源码分析-搜索分析(一)

0. 前言

Elasticsearch 搜索源码分析将会分为三个部分进行讲解:

  1. 请求转发及消息解析
  2. 搜索 Query 阶段
  3. 搜索 Fetch 阶段及数据排序和合并

本文将主要讲述第一部分,具体过程如下:

  1. 用户发起搜索请求,elasticsearch 使用 netty 监听消息
  2. 然后将请求进行转发,然后匹配对应的 Action
  3. 执行 Action 的 execute() 方法,
  4. 最终根据参数 SEARCH_TYPE,获取要执行的 TransportSearchTypeAction 子类,执行搜索逻辑。

1. 一个简单Query请求示例

假设我们有一个名为 item 的索引,且索引文档的 field 中包含 iid 字段,如果我们想查询 iid 为 16368545 的数据,那么 DSL 可以用一个 curl 命令写成如下格式:

curl -XGET "http://127.0.0.1:9200/item/_search" -d'
{
   "query": {
      "term": {
         "iid": {
            "value": 16368545
         }
      }
   }
}' 

参数注释:

  1. 请求的index: item
  2. 请求的es ip: 127.0.0.1
  3. 请求的query: {"query": {"term": {"iid": { "value" : "16368545" }}}}

2. 搜索时序图

第一部分主要是接收用户请求,然后匹配Action,获取实际的RestAction处理请求

搜索时序图-请求转发

第二部分主要是解析Search请求参数,封装成SearchRequest对象,然后调用搜索客户端执行search()


搜索时序图-参数解析

第三部分主要是根据search_type匹配TransportSearchTypeAction,调用子类的execute()方法执行具体的搜索逻辑


搜索时序图-执行搜索

3. 搜索过程

3.1 搜索入口及请求转发

在 elasticsearch 启动时,会启动 Netty 模块,并使用 HttpRequestHandler 的 messageReceived() 方法监听 http 请求,然后对请求进行转发,由对应的 RestAction 处理。

@ChannelHandler.Sharable
public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        HttpRequest request;
        if (this.httpPipeliningEnabled && e instanceof OrderedUpstreamMessageEvent){
            oue = (OrderedUpstreamMessageEvent) e;
            request = (HttpRequest) oue.getMessage();
        } 
        ...
        if (oue != null) {
            serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, corsPattern, oue, detailedErrorsEnabled));
        } 
        ...
        super.messageReceived(ctx, e);
    }
}

在 elasticsearch 启动阶段,会执行 guice 注入,具体的代码如下:

  1. 该代码为节点启动时创建 Node 的代码,会添加 Rest 模块,在创建 Injector 时会执行 Module 的 configure() 方法
public final class InternalNode implements Node {
    public InternalNode(Settings preparedSettings, boolean loadConfigSettings) throws ElasticsearchException {
        try {
            ModulesBuilder modules = new ModulesBuilder();
            // 添加Rest模块
            modules.add(new RestModule(settings));
            ...
            injector = modules.createInjector();
        }finally {
            if (!success) {
                nodeEnvironment.close();
                ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
            }
        }
    }
}
  1. 在 Rest 模块的 configure() 方法中,会继续执行 RestAction 模块的 configure() 方法
public class RestModule extends AbstractModule {
    @Override
    protected void configure() {
        bind(RestController.class).asEagerSingleton();
        new RestActionModule(restPluginsActions).configure(binder());
    }
}
  1. 在 RestAction 模块中,会创建 RestSearchAction 的单例对象
public class RestActionModule extends AbstractModule {
    @Override
    protected void configure() {
        bind(RestSearchAction.class).asEagerSingleton();
    }
}
  1. 在注入 RestSearchAction 对象时,会把方法、URI 和当前对象注册到内存中
public class RestSearchAction extends BaseRestHandler {
    @Inject
    public RestSearchAction(Settings settings, RestController controller, Client client) {
        super(settings, controller, client);
        controller.registerHandler(GET, "/_search", this);
        controller.registerHandler(POST, "/_search", this);
        controller.registerHandler(GET, "/{index}/_search", this);
        ...
    }
}
  1. 下面的代码主要是为不同的method,提供不同的handler,相同的方法放在相同的handler的map中
public class RestController extends AbstractLifecycleComponent<RestController> {
    private final PathTrie<RestHandler> getHandlers = new PathTrie<>(RestUtils.REST_DECODER);
    
    public void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
        switch (method) {
            case GET:
                getHandlers.insert(path, handler);
                break;
            ...
            default:
                throw new ElasticsearchIllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
        }
}
  1. 对于 URI 和 Action 对象,主要是用 PathTrie 类进行存取
public class PathTrie<T> {
    public void insert(String path, T value) {
        String[] strings = Strings.splitStringToArray(path, separator);
        if (strings.length == 0) {
            rootValue = value;
            return;
        }
        int index = 0;
        // supports initial delimiter.
        if (strings.length > 0 && strings[0].isEmpty()) {
            index = 1;
        }
        root.insert(strings, index, value);
    }
    
    public T retrieve(String path, Map<String, String> params) {
        if (path.length() == 0) {
            return rootValue;
        }
        String[] strings = Strings.splitStringToArray(path, separator);
        if (strings.length == 0) {
            return rootValue;
        }
        int index = 0;
        // supports initial delimiter.
        if (strings.length > 0 && strings[0].isEmpty()) {
            index = 1;
        }
        return root.retrieve(strings, index, params);
    }
}
  1. 请求示例中的 item/search 会匹配 /{index}/search,因此处理该请求的 handler 为 RestSearchAction

3.2 请求解析及执行搜索

由上一步可知,经过请求转发,对于 search 请求,使用 RestSearchAction 进行处理。

处理的大致过程为:先对请求 request 进行解析,得到 SearchRequest 对象,然后调用client.search() 执行搜索,最终的搜索结果使用 listener 进行返回

public class RestSearchAction extends BaseRestHandler {
    @Override
    public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
        SearchRequest searchRequest;
        searchRequest = RestSearchAction.parseSearchRequest(request);
        searchRequest.listenerThreaded(false);
        client.search(searchRequest, new RestStatusToXContentListener<SearchResponse>(channel));
    }
}

对请求解析主要分为两部分:

  1. 对http search参数的解析,将解析的结果构造为SearchRequest对象
public class RestSearchAction extends BaseRestHandler {
    public static SearchRequest parseSearchRequest(RestRequest request) {
        String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
        SearchRequest searchRequest = new SearchRequest(indices);
        searchRequest.extraSource(parseSearchSource(request));
        searchRequest.searchType(request.param("search_type"));
        ...
        return searchRequest;
    }
}

对于search_type,如果请求中没有设置,则默认为QUERY_THEN_FETCH

public enum SearchType {
    public static SearchType fromString(String searchType) throws ElasticsearchIllegalArgumentException {
        if (searchType == null) {
            return SearchType.DEFAULT;
        }
        if ("dfs_query_then_fetch".equals(searchType)) {
            return SearchType.DFS_QUERY_THEN_FETCH;
        } else if ("dfs_query_and_fetch".equals(searchType)) {
            return SearchType.DFS_QUERY_AND_FETCH;
        } else if ("query_then_fetch".equals(searchType)) {
            return SearchType.QUERY_THEN_FETCH;
        } else if ("query_and_fetch".equals(searchType)) {
            return SearchType.QUERY_AND_FETCH;
        } else if ("scan".equals(searchType)) {
            return SearchType.SCAN;
        } else if ("count".equals(searchType)) {
            return SearchType.COUNT;
        } else {
            throw new ElasticsearchIllegalArgumentException("No search type for [" + searchType + "]");
        }
    }
}

解析的请求参数:

  1. index: 请求的索引名
  2. source
  3. search_type: 搜索类型,默认为QUERY_THEN_FETCH
  4. query_cache
  5. scroll: 如果是scroll请求,会出入scroll id
  6. type: 索引的type
  7. routing: 路由信息
  8. preference: 搜索偏好,包括对分片、副本和节点的偏好

2.对查询query参数的解析,将解析的结果放入SearchRequest对象中

public class RestSearchAction extends BaseRestHandler {
    public static SearchSourceBuilder parseSearchSource(RestRequest request) {
        SearchSourceBuilder searchSourceBuilder = null;
        
        QuerySourceBuilder querySourceBuilder = RestActions.parseQuerySource(request);
        if (querySourceBuilder != null) {
            searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(querySourceBuilder);
        }

        int from = request.paramAsInt("from", -1);
        if (from != -1) {
            if (searchSourceBuilder == null) {
                searchSourceBuilder = new SearchSourceBuilder();
            }
            searchSourceBuilder.from(from);
        }
        ...
        return searchSourceBuilder;
    }
}

Query支持的参数:

  1. q: 查询词
  2. from: 返回结果的起始位置
  3. size: 返回结果数
  4. explain: 展示打分的详细过程
  5. version: 请求的版本号
  6. timeout: 超时时间
  7. terminate_after
  8. fields: 要返回的字段列表
  9. fielddata_fields: fielddata中的字段列表
  10. track_scores: 强制返回文档得分
  11. sort: 指定排序字段
  12. stats
  13. suggest_field

在解析完搜索参数后,就要调用 client.search() 执行搜索

3.3 经过层层封装,最终执行的是NodeClient.execute()方法,然后从action获取对应的

TransportAction,执行其execute()方法

public class NodeClient extends AbstractClient {
    @SuppressWarnings("unchecked")
    @Override
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
        headers.applyTo(request);
        TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);
        transportAction.execute(request, listener);
    }
}

actions是在ActionModule中进行配置的,SearchAction对应的TransportSearchAction,该类最终继承TransportAction类

TransportSearchAction对应的类图如下:


TransportSearchAction类图
public class ActionModule extends AbstractModule {
    @Override
    protected void configure() {
        registerAction(SearchAction.INSTANCE, TransportSearchAction.class,
                TransportSearchDfsQueryThenFetchAction.class,
                TransportSearchQueryThenFetchAction.class,
                TransportSearchDfsQueryAndFetchAction.class,
                TransportSearchQueryAndFetchAction.class,
                TransportSearchScanAction.class
        );
    }
    public <Request extends ActionRequest, Response extends ActionResponse> void registerAction(GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction, Class... supportTransportActions) {
        actions.put(action.name(), new ActionEntry<>(action, transportAction, supportTransportActions));
    }
}

因此获取到TranportSearchAction执行其execute()方法,其实是执行父类TransportAction的execute()方法

public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
    public final void execute(Request request, ActionListener<Response> listener) {
        ...
        if (filters.length == 0) {
            try {
                doExecute(request, listener);
            } catch(Throwable t) {
                logger.trace("Error during transport action execution.", t);
                listener.onFailure(t);
            }
        } else {
            RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
            requestFilterChain.proceed(actionName, request, listener);
        }
    }
    
    protected abstract void doExecute(Request request, ActionListener<Response> listener);
}

TransportAction的execute是抽象方法,因此实际执行的是TranportSearchAction的doExecute()方法

public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
    @Override
    protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        if (optimizeSingleShard && searchRequest.searchType() != SCAN && searchRequest.searchType() != COUNT) {
            try {
                ...
                if (shardCount == 1) {
                    searchRequest.searchType(QUERY_AND_FETCH);
                }
            } catch (IndexMissingException|IndexClosedException e) {
            } catch (Exception e) {
                logger.debug("failed to optimize search type, continue as normal", e);
            }
        }
        if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) {
            dfsQueryThenFetchAction.execute(searchRequest, listener);
        } else if (searchRequest.searchType() == SearchType.QUERY_THEN_FETCH) {
            queryThenFetchAction.execute(searchRequest, listener);
        } else if (searchRequest.searchType() == SearchType.DFS_QUERY_AND_FETCH) {
            dfsQueryAndFetchAction.execute(searchRequest, listener);
        } else if (searchRequest.searchType() == SearchType.QUERY_AND_FETCH) {
            queryAndFetchAction.execute(searchRequest, listener);
        } else if (searchRequest.searchType() == SearchType.SCAN) {
            scanAction.execute(searchRequest, listener);
        } else if (searchRequest.searchType() == SearchType.COUNT) {
            countAction.execute(searchRequest, listener);
        }
    }
}

在TransportSearchAction中,其实只做了两件事:

  1. 如果请求的索引只有一个分片,那么将search_type设置为QUERY_AND_FETCH
  2. 根据search_type,将search request交给对应的子类执行

各个SearchType的特点如下:

  1. DfsQueryThenFetchAction: 计算分布式词频以获得更准确的评分
  2. DfsQueryAndFetchAction: 请求针对单分片时,计算分布式词频以获得更准确的评分
  3. QueryThenFetchAction:
    3.1 在第一阶段,查询被转发到所有涉及的分片,每个分片执行搜索并生成该分片的本地结果排序列表,每个分片都向协调节点返回足够的信息,以允许它合并并将分片级别结果重新排序为具有最大长度大小的全局排序结果集
    3.2 在第二阶段期间,协调节点仅从相关分片请求文档内容(以及突出显示的片段,如果有的话)
  4. QueryAndFetchAction: 当query_then_fetch请求仅针对单个分片时,会自动选择该模式,两个阶段query和fetch都在一次过程执行
  5. ScanAction: Scan请求
  6. CountAction: Count请求

对Search具体的执行逻辑,我将以QUERY_THEN_FETCH为例,在下篇博客中详细讲解

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,312评论 5 473
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,578评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,337评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,134评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,161评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,303评论 1 280
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,761评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,421评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,609评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,450评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,504评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,194评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,760评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,836评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,066评论 1 257
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,612评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,178评论 2 341

推荐阅读更多精彩内容