ElasticSearch Get流程

处理Get请求的Action为TransportGetAction,首先可以参考一下TransportAction类继承层次, TransportGetAction继承自TransportSingleShardAction,其继承层次如下图所示:

TransportAction类继承层次介绍过TransportSingleShardAction子类主要处理那些只涉及单个shard的操作,如果发生错误之后可以继续到其他副本上操作。

下面我们直接看TransportSingleShardAction.doExecute函数

//TransportSingleShardAction
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
    new AsyncSingleAction(request, listener).start();
}

//TransportSingleShardAction.AsyncSingleAction
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
    this.listener = listener;

    ClusterState clusterState = clusterService.state();
    if (logger.isTraceEnabled()) {
        logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
    }
    nodes = clusterState.nodes();
    ClusterBlockException blockException = checkGlobalBlock(clusterState);
    if (blockException != null) {
        throw blockException;
    }

    String concreteSingleIndex;
    //resolveIndex在子类TransportGetAction没有任务操作,直接返回true
    if (resolveIndex(request)) {
        concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
    } else {
        concreteSingleIndex = request.index();
    }
    this.internalRequest = new InternalRequest(request, concreteSingleIndex);
    //TransportGetAction覆写该函数注释为:
    //update the routing
    resolveRequest(clusterState, internalRequest);

    blockException = checkRequestBlock(clusterState, internalRequest);
    if (blockException != null) {
        throw blockException;
    }
    //这个方法在TransportGetAction中定义,负责找到可以处理该请求的分片Iterator
    this.shardIt = shards(clusterState, internalRequest);
}

在构造AsyncSingleAction之后,会调用其start方法:

//TransportSingleShardAction.AsyncSingleAction
public void start() {
    //在本节点没有获取到处理该请求的分片迭代器
    if (shardIt == null) {
        //下面是官网的注释,写的很清楚,直接在本地尝试进行具体的Get操作
        //能找到Get结果则正常返回,否则返回失败。
        //本地执行Get操作逻辑和下面介绍的把请求发送给其他节点的操作一样,
        //都是调用transportShardAction注册的ShardTransportHandler处理
        // just execute it on the local node
        transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() {
            @Override
            public Response newInstance() {
                return newResponse();
            }

            @Override
            public String executor() {
                return ThreadPool.Names.SAME;
            }

            @Override
            public void handleResponse(final Response response) {
                listener.onResponse(response);
            }

            @Override
            public void handleException(TransportException exp) {
                listener.onFailure(exp);
            }
        });
    } else {
        //如果在本节点获取到可以处理该请求的分片
        perform(null);
    }
}

perform函数的逻辑如下:

private void perform(@Nullable final Exception currentFailure) {
        //异常的相关处理,记录最后发生的异常,如果操作失败
        //(在本节点或者所有节点操作都失败)返回该异常给客户端
        Exception lastFailure = this.lastFailure;
        if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
            lastFailure = currentFailure;
            this.lastFailure = currentFailure;
        }
        //从分片列表中获取下一个分片,这里也是重试逻辑所在,如果获取到shardRouting
        //但是操作失败,会再次执行perform函数,此时会获取到下一个ShardRouting
        final ShardRouting shardRouting = shardIt.nextOrNull();
        //如果没有获取到ShardRouting则返回失败给客户端
        if (shardRouting == null) {
            Exception failure = lastFailure;
            if (failure == null || isShardNotAvailableException(failure)) {
                failure = new NoShardAvailableActionException(null, LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
            } else {
                logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null, internalRequest.request()), failure);
            }
            listener.onFailure(failure);
            return;
        }
        //从ShardRouting中获取node
        DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
        //获取失败则调用AsyncSingleAction.onFailure,在该函数中会再次调用perform函数
        if (node == null) {
            onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
        } else {
            ···
            //当前节点为协调节点,会发送Get请求到ShardIt中的第一个节点
            //使用的action为transportShardAction
            transportService.sendRequest(node, transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() {

                @Override
                public Response newInstance() {
                    return newResponse();
                }

                @Override
                public String executor() {
                    return ThreadPool.Names.SAME;
                }
                //返回成功会调用listener.onResponse返回结果给客户端
                @Override
                public void handleResponse(final Response response) {
                    listener.onResponse(response);
                }
                //发生异常则调用AsyncSingleAction.onFailure重试
                @Override
                public void handleException(TransportException exp) {
                    onFailure(shardRouting, exp);
                }
            });
        }
    }
}

//AsyncSingleAction.onFailure主要记录日志并调用perform重试
private void onFailure(ShardRouting shardRouting, Exception e) {
    if (e != null) {
        logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting, internalRequest.request()), e);
    }
    perform(e);
}

上面代码中我们看到协调节点会依次从解析到的节点迭代器获取节点,发送请求给该节点,使用的action为action为transportShardAction。在TransportSingleShardAction的构造函数中可以看到注册的请求处理handler如下:

//TransportSingleShardAction
transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());

TransportSingleShardAction.ShardTransportHandler如下:

//TransportSingleShardAction.ShardTransportHandler
private class ShardTransportHandler implements TransportRequestHandler<Request> {
    @Override
    public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
        }
        //调用异步操作
        asyncShardOperation(request, request.internalShardId, new HandledTransportAction.ChannelActionListener<>(channel,
            transportShardAction, request));
    }
}

TransportSingleShardAction.asyncShardOperation在子类TransportGetAction被重写了,如下:

@Override
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
    IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
    IndexShard indexShard = indexService.getShard(shardId.id());
    //这是Get请求的一个参数,如果realtime=true,则该请求会直接刷新索引,
    //然后进行查询并返回
    if (request.realtime()) { // we are not tied to a refresh cycle here anyway
        super.asyncShardOperation(request, shardId, listener);
    } else {
        //如果realtime=false,则注册一个刷新listener,该索引刷新时会调用,然后进行查询并返回
        //这里就需要等待系统的刷新时间达到,和设置的刷新频率有关。
        indexShard.awaitShardSearchActive(b -> {
            try {
                super.asyncShardOperation(request, shardId, listener);
            } catch (Exception ex) {
                listener.onFailure(ex);
            }
        });
    }
}

TransportSingleShardAction.asyncShardOperation如下:

//TransportSingleShardAction
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
    threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() {
        @Override
        public void onFailure(Exception e) {
            listener.onFailure(e);
        }

        @Override
        protected void doRun() throws Exception {
            //shardOperation是真正处理请求的地方,在TransportSingleShardAction子类
            //TransportGetAction中定义
            listener.onResponse(shardOperation(request, shardId));
        }
    });
}

下面看TransportGetAction.shardOperation

//TransportGetAction
@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
    IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
    IndexShard indexShard = indexService.getShard(shardId.id());

    if (request.refresh() && !request.realtime()) {
        indexShard.refresh("refresh_flag_get");
    }

    GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
            request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
    return new GetResponse(result);
}

最终会在Engine中处理Get操作,下面大致看一下InternalEngine.get函数实现:

//InternalEngine
@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
    assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
    try (ReleasableLock ignored = readLock.acquire()) {
        ensureOpen();
        SearcherScope scope;
        //realtime=true下面会执行一次refresh操作
        if (get.realtime()) {
            VersionValue versionValue = null;
            try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
                // we need to lock here to access the version map to do this truly in RT
                versionValue = getVersionFromMap(get.uid().bytes());
            }
            if (versionValue != null) {
                //如果该文档标识被删除了,则返回不存在
                if (versionValue.isDelete()) {
                    return GetResult.NOT_EXISTS;
                }
                //版本冲突检查
                if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
                    throw new VersionConflictEngineException(shardId, get.type(), get.id(),
                        get.versionType().explainConflictForReads(versionValue.version, get.version()));
                }
                //通过函数调用轨迹可以看到这里默认会返回false
                //从下面的注释也可以看出这里只用于update操作
                if (get.isReadFromTranslog()) {
                    // this is only used for updates - API _GET calls will always read form a reader for consistency
                    // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
                    if (versionValue.getLocation() != null) {
                        try {
                            Translog.Operation operation = translog.readOperation(versionValue.getLocation());
                            if (operation != null) {
                                // in the case of a already pruned translog generation we might get null here - yet very unlikely
                                TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
                                    .getIndexSettings().getIndexVersionCreated());
                                return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), logger),
                                    new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
                            }
                        } catch (IOException e) {
                            maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
                            throw new EngineException(shardId, "failed to read operation from translog", e);
                        }
                    } else {
                        trackTranslogLocation.set(true);
                    }
                }
                //realtime=true,执行刷新
                refresh("realtime_get", SearcherScope.INTERNAL);
            }
            scope = SearcherScope.INTERNAL;
        } else {
            // we expose what has been externally expose in a point in time snapshot via an explicit refresh
            scope = SearcherScope.EXTERNAL;
        }

        // no version, get the version from the index, we know that we refresh on flush
        //获取最终的文档
        return getFromSearcher(get, searcherFactory, scope);
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,440评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,814评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,427评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,710评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,625评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,014评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,511评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,162评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,311评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,262评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,278评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,989评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,583评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,664评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,904评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,274评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,856评论 2 339

推荐阅读更多精彩内容

  • 这是16年5月份编辑的一份比较杂乱适合自己观看的学习记录文档,今天18年5月份再次想写文章,发现简书还为我保存起的...
    Jenaral阅读 2,718评论 2 9
  • 据说苏州这家诚品书店是大陆唯一一家。其实这也没什么新鲜,大陆唯一一家的店太多了。不过品牌大,所以去看看。 诚品文具...
    献刀阅读 268评论 0 0
  • 文|杂家大兵 工作这几年,经历过两家公司,规模一个几千人,一个上万人,所以谈不上说阅人无数,但也多多少少遇到过很多...
    杂家大兵阅读 734评论 0 2
  • 愿所有被遗忘的都不会突然醒来 让我睡得安心 愿所有被掩饰的都不会浮出水面 让我保持沉默 愿森林覆盖森林 愿河流奔向...
    珞偲圭璋阅读 261评论 0 0
  • 这段时间负能量太多。 宝宝生日那天,我跟宝宝拍了个合照,发到朋友圈,自我感觉良好。 后来第二天和朋友聊天,我说到要...
    茶小箐阅读 485评论 5 1