ElasticSearch Get流程

处理Get请求的Action为TransportGetAction,首先可以参考一下TransportAction类继承层次, TransportGetAction继承自TransportSingleShardAction,其继承层次如下图所示:



protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
    new AsyncSingleAction(request, listener).start();

private AsyncSingleAction(Request request, ActionListener<Response> listener) {
    this.listener = listener;

    ClusterState clusterState = clusterService.state();
    if (logger.isTraceEnabled()) {
        logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
    nodes = clusterState.nodes();
    ClusterBlockException blockException = checkGlobalBlock(clusterState);
    if (blockException != null) {
        throw blockException;

    String concreteSingleIndex;
    if (resolveIndex(request)) {
        concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
    } else {
        concreteSingleIndex = request.index();
    this.internalRequest = new InternalRequest(request, concreteSingleIndex);
    //update the routing
    resolveRequest(clusterState, internalRequest);

    blockException = checkRequestBlock(clusterState, internalRequest);
    if (blockException != null) {
        throw blockException;
    this.shardIt = shards(clusterState, internalRequest);


public void start() {
    if (shardIt == null) {
        // just execute it on the local node
        transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() {
            public Response newInstance() {
                return newResponse();

            public String executor() {
                return ThreadPool.Names.SAME;

            public void handleResponse(final Response response) {

            public void handleException(TransportException exp) {
    } else {


private void perform(@Nullable final Exception currentFailure) {
        Exception lastFailure = this.lastFailure;
        if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
            lastFailure = currentFailure;
            this.lastFailure = currentFailure;
        final ShardRouting shardRouting = shardIt.nextOrNull();
        if (shardRouting == null) {
            Exception failure = lastFailure;
            if (failure == null || isShardNotAvailableException(failure)) {
                failure = new NoShardAvailableActionException(null, LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
            } else {
                logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null, internalRequest.request()), failure);
        DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
        if (node == null) {
            onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
        } else {
            transportService.sendRequest(node, transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() {

                public Response newInstance() {
                    return newResponse();

                public String executor() {
                    return ThreadPool.Names.SAME;
                public void handleResponse(final Response response) {
                public void handleException(TransportException exp) {
                    onFailure(shardRouting, exp);

private void onFailure(ShardRouting shardRouting, Exception e) {
    if (e != null) {
        logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting, internalRequest.request()), e);


transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());


private class ShardTransportHandler implements TransportRequestHandler<Request> {
    public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
        asyncShardOperation(request, request.internalShardId, new HandledTransportAction.ChannelActionListener<>(channel,
            transportShardAction, request));


protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
    IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
    IndexShard indexShard = indexService.getShard(shardId.id());
    if (request.realtime()) { // we are not tied to a refresh cycle here anyway
        super.asyncShardOperation(request, shardId, listener);
    } else {
        indexShard.awaitShardSearchActive(b -> {
            try {
                super.asyncShardOperation(request, shardId, listener);
            } catch (Exception ex) {


protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
    threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() {
        public void onFailure(Exception e) {

        protected void doRun() throws Exception {
            listener.onResponse(shardOperation(request, shardId));


protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
    IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
    IndexShard indexShard = indexService.getShard(shardId.id());

    if (request.refresh() && !request.realtime()) {

    GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
            request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
    return new GetResponse(result);


public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
    assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
    try (ReleasableLock ignored = readLock.acquire()) {
        SearcherScope scope;
        if (get.realtime()) {
            VersionValue versionValue = null;
            try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
                // we need to lock here to access the version map to do this truly in RT
                versionValue = getVersionFromMap(get.uid().bytes());
            if (versionValue != null) {
                if (versionValue.isDelete()) {
                    return GetResult.NOT_EXISTS;
                if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
                    throw new VersionConflictEngineException(shardId, get.type(), get.id(),
                        get.versionType().explainConflictForReads(versionValue.version, get.version()));
                if (get.isReadFromTranslog()) {
                    // this is only used for updates - API _GET calls will always read form a reader for consistency
                    // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
                    if (versionValue.getLocation() != null) {
                        try {
                            Translog.Operation operation = translog.readOperation(versionValue.getLocation());
                            if (operation != null) {
                                // in the case of a already pruned translog generation we might get null here - yet very unlikely
                                TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
                                return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), logger),
                                    new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
                        } catch (IOException e) {
                            maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
                            throw new EngineException(shardId, "failed to read operation from translog", e);
                    } else {
                refresh("realtime_get", SearcherScope.INTERNAL);
            scope = SearcherScope.INTERNAL;
        } else {
            // we expose what has been externally expose in a point in time snapshot via an explicit refresh
            scope = SearcherScope.EXTERNAL;

        // no version, get the version from the index, we know that we refresh on flush
        return getFromSearcher(get, searcherFactory, scope);
