Kettle插件开发之Elasticsearch篇

一、开发背景

知己知彼,百战不殆。既然要开发Elasticsearch批量写入插件,那我们首先了解下ElasticSearch。
• Elasticsearch是一个实时分布式存储、搜索和数据分析引擎。它让你以前所未有的速度处理大数 据成为可能。它用于全文搜索、结构化搜索、分析以及将这三者混合使用。
• Elasticsearch也使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单。
• Elasticsearch很快,快到不可思议。强大的设计,方便我们通过有限状态转换器实现了用于全文检索的倒排索引,实现了用于存储数值数据和地理位置数据的 BKD 树,以及用于分析的列存储。
• Elasticsearch和传统数据库RDBMS比较


image.png

二、索引创建

基于Kettle环境平台,构建app-pentaho-es6或app-pentaho-es7插件,实现原理是动态数据流字段,自定动态索引,来实现ElasticSearch Bulk Insert批量写入。所以,我们首先要了解如何基于索引模板(kettle-es)模式,按日期分片创建动态索引(kettle-es_*)实现写入,按别名kettle-es-query来实现索引检索。具体示例如下:

PUT _template/kettle-es
{
  "index_patterns": [
    "kettle-es_*"
  ],
  "settings": {
    "index": {
      "max_result_window": "100000",
      "number_of_shards": "3",
      "number_of_replicas": "1"
    }
  },
  "aliases": {
    "kettle-es-query": {}
  },
  "mappings": {
    "properties": {
      "agent_ip": {
        "type": "ip"
      },
      "record_id": {
        "type": "integer"
      },
      "start_time": {
        "format": "yyyy-MM-dd HH:mm:ss",
        "type": "date"
      },
      "fire_time": {
        "format": "yyyy-MM-dd HH:mm:ss",
        "type": "date"
      },
      "end_time": {
        "format": "yyyy-MM-dd HH:mm:ss",
        "type": "date"
      },
      "pid": {
        "type": "keyword"
      },
      "message": {
        "index": false,
        "type": "text"
      }
    }
  }
}

三、ElasticSearch Bulk Insert源代码介绍

3.1、源代码目录结构

无论是app-pentaho-es6或app-pentaho-es7,从代码目录结构来看,都是ElasticSearchBulk步骤类、ElasticSearchBulkData数据类、ElasticSearchBulkMeta元数据类和ElasticSearchBulkDialog对话框类、日志消息提醒配置message。具体可查看源代码

3.1.1、目录结构对比

image.png

3.1.2、maven配置对比

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <pdi.version>6.1.0.1-SNAPSHOT</pdi.version>
        <build.revision>${project.version}</build.revision>
        <timestamp>${maven.build.timestamp}</timestamp>
        <build.description>${project.description}</build.description>
        <maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format>
        <elasticsearch.version>6.4.2</elasticsearch.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>compile</scope>
        </dependency>
   <dependencies>
<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <pdi.version>6.1.0.1-SNAPSHOT</pdi.version>
        <build.revision>${project.version}</build.revision>
        <timestamp>${maven.build.timestamp}</timestamp>
        <build.description>${project.description}</build.description>
        <maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format>
        <elasticsearch.version>7.2.0</elasticsearch.version>
    </properties>
    <dependencies>
       <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>compile</scope>
        </dependency>
   <dependencies>

3.2、app-pentaho-es6插件

3.2.1、TransportClient API说明

app-pentaho-es6插件,基于TransportClient使用传输模块远程连接到集群。后面,elastic计划在Elasticsearch 7.0中弃用TransportClient,并在8.0中完全删除它。
  获取Elasticsearch客户端最常用方法是创建连接到群集的TransportClient。它不加入集群,而只是获取一个或多个初始传输地址,并在每个操作上以循环方式与它们通信。

3.2.2、客户端初始化

(1)确定PreBuiltTransportClient连接es集群名称、地址、端口和协议等信息,设置TransportAddress配置
(2)测试制定Index是否正常连接成功,得到Client

private void initClient() throws UnknownHostException {
        Settings.Builder settingsBuilder = Settings.builder();
        settingsBuilder.put(Settings.Builder.EMPTY_SETTINGS);
        Map<String, String> tMetaMap = meta.getSettingsMap();
        Iterator<Entry<String, String>> iter = tMetaMap.entrySet().iterator();
        while (iter.hasNext()) {
            Entry<String, String> entry = (Entry<String, String>) iter.next();
            settingsBuilder.put(entry.getKey(),
                    environmentSubstitute(entry.getValue()));
        }
        PreBuiltTransportClient tClient = new PreBuiltTransportClient(
                settingsBuilder.build());
        for (Server server : meta.getServers()) {
            tClient.addTransportAddress(new TransportAddress(InetAddress
                    .getByName(environmentSubstitute(server.getAddress())),
                    server.getPort()));
        }
        client = tClient;
    }

3.2.3、数据流处理

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
            throws KettleException {
        Object[] rowData = getRow();
        if (rowData == null) {
            if (currentRequest != null && currentRequest.numberOfActions() > 0) {
                // didn't fill a whole batch
                processBatch(false);
            }
            setOutputDone();
            return false;
        }
        if (first) {
            first = false;
            setupData();
            currentRequest = client.prepareBulk();
            requestsBuffer = new ArrayList<IndexRequestBuilder>(this.batchSize);
            initFieldIndexes();
        }
        try {
            data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;
            return indexRow(data.inputRowMeta, rowData) || !stopOnError;
        } catch (KettleStepException e) {
            throw e;
        } catch (Exception e) {
            rejectAllRows(e.getLocalizedMessage());
            String msg = BaseMessages.getString(PKG,
                    "ElasticSearchBulk.Log.Exception", e.getLocalizedMessage());
            logError(msg);
            throw new KettleStepException(msg, e);
        }
    }

3.2.4、数据批次处理

private boolean processBatch(boolean makeNew) throws KettleStepException {
        ActionFuture<BulkResponse> actionFuture = currentRequest.execute();
        boolean responseOk = false;
        BulkResponse response = null;
        try {
            if (timeout != null && timeoutUnit != null) {
                response = actionFuture.actionGet(timeout, timeoutUnit);
            } else {
                response = actionFuture.actionGet();
            }
        } catch (ElasticsearchException e) {
            String msg = BaseMessages.getString(PKG,
                    "ElasticSearchBulk.Error.BatchExecuteFail",
                    e.getLocalizedMessage());
            if (e instanceof ElasticsearchTimeoutException) {
                msg = BaseMessages.getString(PKG,
                        "ElasticSearchBulk.Error.Timeout");
            }
            logError(msg);
            rejectAllRows(msg);
        }
        if (response != null) {
            responseOk = handleResponse(response);
            requestsBuffer.clear();
        } else { // have to assume all failed
            numberOfErrors += currentRequest.numberOfActions();
            setErrors(numberOfErrors);
        }
        // duration += response.getTookInMillis(); //just in trunk..
        if (makeNew) {
            currentRequest = client.prepareBulk();
            data.nextBufferRowIdx = 0;
            data.inputRowBuffer = new Object[batchSize][];
        } else {
            currentRequest = null;
            data.inputRowBuffer = null;
        }
        return responseOk;
    }

3.3、app-pentaho-es7插件

3.3.1、RestHighLevelClient API说明

app-pentaho-es7插件,基于Elasticsearch提供的Java高级REST客户端RestHighLevelClient,它执行HTTP请求而不是序列化的Java请求。Java客户端主要用途有:
(1)在现有集群上执行标准索引,获取,删除和搜索操作
(2)在正在运行的集群上执行管理任务

3.3.2、客户端初始化

(1)使用CredentialsProvider初始化Elasticsearch身份认证
(2)确定RestHighLevelClient连接es集群名称、地址、端口和协议等信息,设置setHttpClientConfigCallback回调配置
(3)测试制定Index是否正常连接成功,得到Client

private void initClient() throws UnknownHostException {
        Settings.Builder settingsBuilder = Settings.builder();
        settingsBuilder.put(Settings.Builder.EMPTY_SETTINGS);
        meta.getSettingsMap()
                .entrySet()
                .stream()
                .forEach(
                        (s) -> settingsBuilder.put(s.getKey(),
                                environmentSubstitute(s.getValue())));
        RestHighLevelClient rclient = null;
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(settingsBuilder.get("es.user"),
                        settingsBuilder.get("es.password")));
        for (Server server : meta.getServers()) {
            rclient = new RestHighLevelClient(RestClient.builder(
                    new HttpHost(server.getAddress(), Integer.valueOf(server
                            .getPort()), "http")).setHttpClientConfigCallback(
                    new RestClientBuilder.HttpClientConfigCallback() {
                        public HttpAsyncClientBuilder customizeHttpClient(
                                HttpAsyncClientBuilder httpClientBuilder) {
                            return httpClientBuilder
                                    .setDefaultCredentialsProvider(credentialsProvider);
                        }
                    }));
        }
        client = rclient;
    }

3.3.3、数据流处理

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
            throws KettleException {
        Object[] rowData = getRow();
        if (rowData == null) {
            if (currentRequest != null && currentRequest.numberOfActions() > 0) {
                processBatch(false);
            }
            setOutputDone();
            return false;
        }
        if (first) {
            first = false;
            setupData();
            requestsBuffer = new ArrayList<IndexRequest>(this.batchSize);
            initFieldIndexes();
        }
        try {
            data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;
            return indexRow(data.inputRowMeta, rowData) || !stopOnError;
        } catch (KettleStepException e) {
            throw e;
        } catch (Exception e) {
            rejectAllRows(e.getLocalizedMessage());
            String msg = BaseMessages.getString(PKG,
                    "ElasticSearchBulk.Log.Exception", e.getLocalizedMessage());
            logError(msg);
            throw new KettleStepException(msg, e);
        }
    }

3.3.4、数据批次处理

private boolean processBatch(boolean makeNew) throws KettleStepException {
        BulkResponse response = null;
        // ActionFuture<BulkResponse> actionFuture = currentRequest.execute();
        try {
            response = client.bulk(currentRequest, RequestOptions.DEFAULT);
        } catch (IOException e1) {
            rejectAllRows(e1.getLocalizedMessage());
            String msg = BaseMessages
                    .getString(PKG, "ElasticSearchBulk.Log.Exception",
                            e1.getLocalizedMessage());
            logError(msg);
            throw new KettleStepException(msg, e1);
        }
        boolean responseOk = false;
        if (response != null) {
            responseOk = handleResponse(response);
            requestsBuffer.clear();
        } else { // have to assume all failed
            numberOfErrors += currentRequest.numberOfActions();
            setErrors(numberOfErrors);
        }
        if (makeNew) {
            // currentRequest = client.prepareBulk();
            try {
                client.bulk(currentRequest, RequestOptions.DEFAULT);
            } catch (IOException e1) {
                rejectAllRows(e1.getLocalizedMessage());
                String msg = BaseMessages.getString(PKG,
                        "ElasticSearchBulk.Log.Exception",
                        e1.getLocalizedMessage());
                logError(msg);
                throw new KettleStepException(msg, e1);
            }
            data.nextBufferRowIdx = 0;
            data.inputRowBuffer = new Object[batchSize][];
        } else {
            currentRequest = null;
            data.inputRowBuffer = null;
        }
        return responseOk;
    }

无论服务端是那个版本的Elasticsearch集群,客户端必须具有与服务端群集中的节点相同的主要版本(例如6.x或7.x)

四、ElasticSearch Bulk Insert使用说明

4.1、General参数

①Index:动态索引字段,索引前缀+动态日期

②Type:默认_doc

③Test Index:在线检查索引是否存在

④Batch Size:批次大小

⑤Stop on error:遇到错误是否终止

⑥Batch Timeout:批次写入超时时间,单位秒

⑦Id Field:即文档ID,doc_id

⑧Overwrite if exists:存在是否覆盖

⑨Output Rows:输出行

image.png

4.2、Servers参数

①Address:Elasticsearch集群地址列表

②Port:匹配端口号

image.png

4.3、Fields输出字段

①Name:数据流字段

②Target Name:Elasticsearch集群对应index,目标mapping字段

image.png

4.4、Settings参数

①cluster.name:集群名称

②es.user:es鉴权认证用户名,自定义参数名

③es.password:es鉴权认证密码,自定义参数名

image.png

五、总结

如果你需要源码或者了解更多自定义插件及集成方式,抑或有开发过程或者使用过程中的任何疑问或建议,请关注小编"游走在数据之间"。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,784评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,745评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,702评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,229评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,245评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,376评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,798评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,471评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,655评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,485评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,535评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,235评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,793评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,863评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,096评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,654评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,233评论 2 341

推荐阅读更多精彩内容