Elasticsearch 客户端源码操作流程 1

阅读ES源代码需要对Guice这个依赖注入框架有一点基本了解, 稍微了解一点基础就不影响阅读.这里也写了个基本看ES Guice的使用方式. ES源码的版本是2.4.1版本.

1 初始化TransportClient

我们操作Elasticsearch时, 会首先创建Client类, 代码如下:

private static Client getClient(){
        Settings settings = Settings.builder().put("cluster.name", "SERVICE-ELASTICSEARCH-aae03d413f4744298859cd4245e4eda5")
            .put("client.transport.ping_timeout", "1200s").build();
        Client client = null;
        try {
            client = TransportClient.builder().settings(settings).build().addTransportAddress
                (new InetSocketTransportAddress(InetAddress.getByName("10.3.70.101"), 9300));
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return client;
    }

即实例化TransportClient对象,看看其源代码

public TransportClient build() {
            //省略...
            final ThreadPool threadPool = new ThreadPool(settings);
            NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();

            boolean success = false;
            try {
                ModulesBuilder modules = new ModulesBuilder();
                modules.add(new Version.Module(version));
                // plugin modules must be added here, before others or we can get crazy injection errors...
                for (Module pluginModule : pluginsService.nodeModules()) {
                    modules.add(pluginModule);
                }
                modules.add(new PluginsModule(pluginsService));
                modules.add(new SettingsModule(this.settings));
                modules.add(new NetworkModule(namedWriteableRegistry));
                modules.add(new ClusterNameModule(this.settings));
                modules.add(new ThreadPoolModule(threadPool));
                modules.add(new TransportModule(this.settings, namedWriteableRegistry));
                modules.add(new SearchModule() {
                    @Override
                    protected void configure() {
                        // noop
                    }
                });
                modules.add(new ActionModule(true));
                modules.add(new ClientTransportModule(hostFailedListener));
                modules.add(new CircuitBreakerModule(this.settings));

                pluginsService.processModules(modules);

                Injector injector = modules.createInjector();
                final TransportService transportService = injector.getInstance(TransportService.class);
                transportService.start();
                transportService.acceptIncomingRequests();

                TransportClient transportClient = new TransportClient(injector); // 初始化TransportClient对象
                success = true;
                return transportClient;
            } finally {
                if (!success) {
                    ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
                }
            }
        }

可以知道代码里面是先通过Guice框架完成相关Module的依赖注入,比如你的增删改查Action实际上是ActionModule所包含的具体实现, 不同Module完成不同的工作.然后TransportClient transportClient = new TransportClient(injector)这个就是实例化TransportClient对象了, 来看看它的构造方法,

private TransportClient(Injector injector) {
        super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), injector.getInstance(Headers.class));
        this.injector = injector;
        nodesService = injector.getInstance(TransportClientNodesService.class);
        proxy = injector.getInstance(TransportProxyClient.class); // 代理类, 真正的完成execute工作的人
    }

nodeService是节点操作类, proxy是代理类,是具体的执行execute方法操作的类.因此看看TransportProxyClient

2 TransportProxyClient

@Inject
    public TransportProxyClient(Settings settings, TransportService transportService, TransportClientNodesService nodesService, Map<String, GenericAction> actions) {
        this.nodesService = nodesService;
        MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
        for (GenericAction action : actions.values()) {
            if (action instanceof Action) {
                actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
            }
        }
        this.proxies = actionsBuilder.immutableMap();
    }

TransportProxyClient使用的是构造器注入, TransportProxyClient 此处是构造器注入, Settings是SettingsModule 绑定的, TransportService 是 TransportModule绑定的, TransportClientNodesService 是 ClientTransportModule绑定的, Map<String, GenericAction> actions是ActionModule绑定生成的. 其中里面的proxies变量比较重要,他其实是一个Map, key是Action, value是TransportActionNodeProxy, 那么这些Action是什么东西呢?
我们知道一开始初始化TransportClient 的时候绑定了很多module其中有一个是ActionModule, 它的configure函数内容如下:

protected void configure() {

        Multibinder<ActionFilter> actionFilterMultibinder = Multibinder.newSetBinder(binder(), ActionFilter.class);
        for (Class<? extends ActionFilter> actionFilter : actionFilters) {
            actionFilterMultibinder.addBinding().to(actionFilter);
        }
        bind(ActionFilters.class).asEagerSingleton();
        bind(AutoCreateIndex.class).asEagerSingleton();
        bind(DestructiveOperations.class).asEagerSingleton();
        registerAction(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
        registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
        registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
        registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class);
        registerAction(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);

        registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
        registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
        registerAction(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
        registerAction(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
        registerAction(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
        registerAction(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
        registerAction(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);
        registerAction(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class);
        registerAction(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
        registerAction(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
        registerAction(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
        registerAction(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
        registerAction(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
        registerAction(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
        registerAction(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
        registerAction(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);

        registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
        registerAction(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
        registerAction(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
        registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
        registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
        registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
        registerAction(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class);
        registerAction(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class);
        registerAction(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class);
        registerAction(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class);
        registerAction(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class);
        registerAction(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class, TransportGetFieldMappingsIndexAction.class);
        registerAction(PutMappingAction.INSTANCE, TransportPutMappingAction.class);
        registerAction(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
        registerAction(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
        registerAction(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
        registerAction(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
        registerAction(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
        registerAction(DeleteIndexTemplateAction.INSTANCE, TransportDeleteIndexTemplateAction.class);
        registerAction(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
        registerAction(RefreshAction.INSTANCE, TransportRefreshAction.class);
        registerAction(FlushAction.INSTANCE, TransportFlushAction.class);
        registerAction(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
        registerAction(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
        registerAction(UpgradeAction.INSTANCE, TransportUpgradeAction.class);
        registerAction(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class);
        registerAction(UpgradeSettingsAction.INSTANCE, TransportUpgradeSettingsAction.class);
        registerAction(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class);
        registerAction(PutWarmerAction.INSTANCE, TransportPutWarmerAction.class);
        registerAction(DeleteWarmerAction.INSTANCE, TransportDeleteWarmerAction.class);
        registerAction(GetWarmersAction.INSTANCE, TransportGetWarmersAction.class);
        registerAction(GetAliasesAction.INSTANCE, TransportGetAliasesAction.class);
        registerAction(AliasesExistAction.INSTANCE, TransportAliasesExistAction.class);
        registerAction(GetSettingsAction.INSTANCE, TransportGetSettingsAction.class);

        registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
        registerAction(GetAction.INSTANCE, TransportGetAction.class);
        registerAction(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class,
                TransportDfsOnlyAction.class);
        registerAction(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class,
                TransportShardMultiTermsVectorAction.class);
        registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class);
        registerAction(ExistsAction.INSTANCE, TransportExistsAction.class);
        registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class);
        registerAction(UpdateAction.INSTANCE, TransportUpdateAction.class);
        registerAction(MultiGetAction.INSTANCE, TransportMultiGetAction.class,
                TransportShardMultiGetAction.class);
        registerAction(BulkAction.INSTANCE, TransportBulkAction.class,
                TransportShardBulkAction.class);
        registerAction(SearchAction.INSTANCE, TransportSearchAction.class);
        registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
        registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
        registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class);
        registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class);
        registerAction(ExplainAction.INSTANCE, TransportExplainAction.class);
        registerAction(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
        registerAction(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
        registerAction(RenderSearchTemplateAction.INSTANCE, TransportRenderSearchTemplateAction.class);

        //Indexed scripts
        registerAction(PutIndexedScriptAction.INSTANCE, TransportPutIndexedScriptAction.class);
        registerAction(GetIndexedScriptAction.INSTANCE, TransportGetIndexedScriptAction.class);
        registerAction(DeleteIndexedScriptAction.INSTANCE, TransportDeleteIndexedScriptAction.class);

        registerAction(FieldStatsAction.INSTANCE, TransportFieldStatsTransportAction.class);

        // register Name -> GenericAction Map that can be injected to instances.
        MapBinder<String, GenericAction> actionsBinder
                = MapBinder.newMapBinder(binder(), String.class, GenericAction.class);

        for (Map.Entry<String, ActionEntry> entry : actions.entrySet()) {
            actionsBinder.addBinding(entry.getKey()).toInstance(entry.getValue().action);
        }
//.......省略
}

public <Request extends ActionRequest, Response extends ActionResponse> void registerAction(GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction, Class... supportTransportActions) {
        actions.put(action.name(), new ActionEntry<>(action, transportAction, supportTransportActions));
    }

上面ActionModule 通过registerAction方法注册Action, 并加入到actions这个Map函数中, 然后通过MapBinder<String, GenericAction> 将数据注入其他实例,一次初始化TransportProxyClient的时候即可使用此MapBinder<String, GenericAction>

// register Name -> GenericAction Map that can be injected to instances.
MapBinder<String, GenericAction> actionsBinder
                = MapBinder.newMapBinder(binder(), String.class, GenericAction.class);

        for (Map.Entry<String, ActionEntry> entry : actions.entrySet()) {
            actionsBinder.addBinding(entry.getKey()).toInstance(entry.getValue().action);
        }

3 Request操作流程

经常会使用ES的增删改查以及Admin相关操作, 实际上最终都会经过代理类TransportProxyClient进行执行. 这里以Get操作来说明一下, 其他的Request类似.
首先放一个Get的demo代码:

public class App
{
    public static void main(String[] args) {
        create();
    }

    public static void create(){
        Client client = getClient();


        getInfo(client);
        //getIndice(client);
        
    }
    private static void getInfo(Client client) {
        GetRequestBuilder getRequestBuilder = client.prepareGet("face_fixedperson", "Fixedperson", "126tc");
        GetResponse getFields = getRequestBuilder.execute().actionGet();
        String sourceAsString = getFields.getSourceAsString();
        System.out.println("-----" + sourceAsString);
    }
    private static void getIndice(Client client) {
        //Index index = new Index();
        AdminClient admin = client.admin();
        IndicesAdminClient indices = admin.indices();

        ListenableActionFuture<GetIndexResponse> execute = indices.prepareGetIndex().execute();
        GetIndexResponse getIndexResponse = execute.actionGet();
        String[] indices1 = getIndexResponse.getIndices();
        for (String in : indices1) {
            System.out.println(in);
        }
    }

    private static Client getClient(){
        Settings settings = Settings.builder().put("cluster.name", "SERVICE-ELASTICSEARCH-aae03d413f4744298859cd4245e4eda5")
            .put("client.transport.ping_timeout", "1200s").build();
        Client client = null;
        try {
            client = TransportClient.builder().settings(settings).build().addTransportAddress
                (new InetSocketTransportAddress(InetAddress.getByName("10.3.70.101"), 9300));
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return client;
    }
}

当执行getRequestBuilder.execute()的时候, 会先转到ActionRequestBuilder的

public void execute(ActionListener<Response> listener) {
        client.execute(action, beforeExecute(request), listener);
    }

然后转到了AbstractClient的

public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
        headers.applyTo(request);
        listener = threadedWrapper.wrap(listener);
        doExecute(action, request, listener);
    }

其中的doExecute实际上是TransportClient的doExecute, 其内部实际上就是通过TransportProxyClient来实现进行具体的execute操作的.

@Override
    protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
        proxy.execute(action, request, listener);
    }

继续看看TransportProxyClient的execute方法:

public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action, final Request request, ActionListener<Response> listener) {
        final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
        nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
            @Override
            public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
                proxy.execute(node, request, listener);
            }
        }, listener);
    }

TransportActionNodeProxy<Request, Response> proxy = proxies.get(action); 这一步根据Action来获取具体的TransportActionNodeProxy类, 具体可参见TransportProxyClient的构造那部分.
接着nodesService.execute方法实际上就是从可用的Node中随机选择出一个node,然后执行proxy.execute(node, request, listener)方法, 进而将请求发给此Node.

大题来说, ES一开始初始化TransportClient的时候会绑定多种多样的Module, 然后在初始化TransportClient的时候同时会初始化代理类TransportProxyClient, 当通过TransportClient来提交Request请求的时候吧, 会交由代理类来真正的执行, 代理类会从可用节点中选择一个节点然后发送请求到此节点上. 这就是ES操作的前奏, 至于具体的操作细节则后面在写.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,563评论 18 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,531评论 18 399
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 10,835评论 6 13
  • 枕在冬日宁静安祥的臂弯,踩着清晨的第一片柔软的银色素毯,揉触着空气寒凉的体肤,格外清越,一种空冥的感觉拉近了山野与...
    濠牛B阅读 611评论 0 0
  • 我在画布上涂描 有七月的风吹过 窗外大雨滂沱 泛滥着心底那条河 桌角相框里的合影 已褪了颜色 阳光洒满的眼神 那么...
    眉间飞雪阅读 192评论 10 10