一、开发背景
知己知彼,百战不殆。既然要开发Elasticsearch批量写入插件,那我们首先了解下ElasticSearch。
• Elasticsearch是一个实时分布式存储、搜索和数据分析引擎。它让你以前所未有的速度处理大数 据成为可能。它用于全文搜索、结构化搜索、分析以及将这三者混合使用。
• Elasticsearch也使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单。
• Elasticsearch很快,快到不可思议。强大的设计,方便我们通过有限状态转换器实现了用于全文检索的倒排索引,实现了用于存储数值数据和地理位置数据的 BKD 树,以及用于分析的列存储。
• Elasticsearch和传统数据库RDBMS比较
二、索引创建
基于Kettle环境平台,构建app-pentaho-es6或app-pentaho-es7插件,实现原理是动态数据流字段,自定动态索引,来实现ElasticSearch Bulk Insert批量写入。所以,我们首先要了解如何基于索引模板(kettle-es)模式,按日期分片创建动态索引(kettle-es_*)实现写入,按别名kettle-es-query来实现索引检索。具体示例如下:
PUT _template/kettle-es
{
"index_patterns": [
"kettle-es_*"
],
"settings": {
"index": {
"max_result_window": "100000",
"number_of_shards": "3",
"number_of_replicas": "1"
}
},
"aliases": {
"kettle-es-query": {}
},
"mappings": {
"properties": {
"agent_ip": {
"type": "ip"
},
"record_id": {
"type": "integer"
},
"start_time": {
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
},
"fire_time": {
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
},
"end_time": {
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
},
"pid": {
"type": "keyword"
},
"message": {
"index": false,
"type": "text"
}
}
}
}
三、ElasticSearch Bulk Insert源代码介绍
3.1、源代码目录结构
无论是app-pentaho-es6或app-pentaho-es7,从代码目录结构来看,都是ElasticSearchBulk步骤类、ElasticSearchBulkData数据类、ElasticSearchBulkMeta元数据类和ElasticSearchBulkDialog对话框类、日志消息提醒配置message。具体可查看源代码
3.1.1、目录结构对比
3.1.2、maven配置对比
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<pdi.version>6.1.0.1-SNAPSHOT</pdi.version>
<build.revision>${project.version}</build.revision>
<timestamp>${maven.build.timestamp}</timestamp>
<build.description>${project.description}</build.description>
<maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format>
<elasticsearch.version>6.4.2</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
<scope>compile</scope>
</dependency>
<dependencies>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<pdi.version>6.1.0.1-SNAPSHOT</pdi.version>
<build.revision>${project.version}</build.revision>
<timestamp>${maven.build.timestamp}</timestamp>
<build.description>${project.description}</build.description>
<maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format>
<elasticsearch.version>7.2.0</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<scope>compile</scope>
</dependency>
<dependencies>
3.2、app-pentaho-es6插件
3.2.1、TransportClient API说明
app-pentaho-es6插件,基于TransportClient使用传输模块远程连接到集群。后面,elastic计划在Elasticsearch 7.0中弃用TransportClient,并在8.0中完全删除它。
获取Elasticsearch客户端最常用方法是创建连接到群集的TransportClient。它不加入集群,而只是获取一个或多个初始传输地址,并在每个操作上以循环方式与它们通信。
3.2.2、客户端初始化
(1)确定PreBuiltTransportClient连接es集群名称、地址、端口和协议等信息,设置TransportAddress配置
(2)测试制定Index是否正常连接成功,得到Client
private void initClient() throws UnknownHostException {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(Settings.Builder.EMPTY_SETTINGS);
Map<String, String> tMetaMap = meta.getSettingsMap();
Iterator<Entry<String, String>> iter = tMetaMap.entrySet().iterator();
while (iter.hasNext()) {
Entry<String, String> entry = (Entry<String, String>) iter.next();
settingsBuilder.put(entry.getKey(),
environmentSubstitute(entry.getValue()));
}
PreBuiltTransportClient tClient = new PreBuiltTransportClient(
settingsBuilder.build());
for (Server server : meta.getServers()) {
tClient.addTransportAddress(new TransportAddress(InetAddress
.getByName(environmentSubstitute(server.getAddress())),
server.getPort()));
}
client = tClient;
}
3.2.3、数据流处理
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
throws KettleException {
Object[] rowData = getRow();
if (rowData == null) {
if (currentRequest != null && currentRequest.numberOfActions() > 0) {
// didn't fill a whole batch
processBatch(false);
}
setOutputDone();
return false;
}
if (first) {
first = false;
setupData();
currentRequest = client.prepareBulk();
requestsBuffer = new ArrayList<IndexRequestBuilder>(this.batchSize);
initFieldIndexes();
}
try {
data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;
return indexRow(data.inputRowMeta, rowData) || !stopOnError;
} catch (KettleStepException e) {
throw e;
} catch (Exception e) {
rejectAllRows(e.getLocalizedMessage());
String msg = BaseMessages.getString(PKG,
"ElasticSearchBulk.Log.Exception", e.getLocalizedMessage());
logError(msg);
throw new KettleStepException(msg, e);
}
}
3.2.4、数据批次处理
private boolean processBatch(boolean makeNew) throws KettleStepException {
ActionFuture<BulkResponse> actionFuture = currentRequest.execute();
boolean responseOk = false;
BulkResponse response = null;
try {
if (timeout != null && timeoutUnit != null) {
response = actionFuture.actionGet(timeout, timeoutUnit);
} else {
response = actionFuture.actionGet();
}
} catch (ElasticsearchException e) {
String msg = BaseMessages.getString(PKG,
"ElasticSearchBulk.Error.BatchExecuteFail",
e.getLocalizedMessage());
if (e instanceof ElasticsearchTimeoutException) {
msg = BaseMessages.getString(PKG,
"ElasticSearchBulk.Error.Timeout");
}
logError(msg);
rejectAllRows(msg);
}
if (response != null) {
responseOk = handleResponse(response);
requestsBuffer.clear();
} else { // have to assume all failed
numberOfErrors += currentRequest.numberOfActions();
setErrors(numberOfErrors);
}
// duration += response.getTookInMillis(); //just in trunk..
if (makeNew) {
currentRequest = client.prepareBulk();
data.nextBufferRowIdx = 0;
data.inputRowBuffer = new Object[batchSize][];
} else {
currentRequest = null;
data.inputRowBuffer = null;
}
return responseOk;
}
3.3、app-pentaho-es7插件
3.3.1、RestHighLevelClient API说明
app-pentaho-es7插件,基于Elasticsearch提供的Java高级REST客户端RestHighLevelClient,它执行HTTP请求而不是序列化的Java请求。Java客户端主要用途有:
(1)在现有集群上执行标准索引,获取,删除和搜索操作
(2)在正在运行的集群上执行管理任务
3.3.2、客户端初始化
(1)使用CredentialsProvider初始化Elasticsearch身份认证
(2)确定RestHighLevelClient连接es集群名称、地址、端口和协议等信息,设置setHttpClientConfigCallback回调配置
(3)测试制定Index是否正常连接成功,得到Client
private void initClient() throws UnknownHostException {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(Settings.Builder.EMPTY_SETTINGS);
meta.getSettingsMap()
.entrySet()
.stream()
.forEach(
(s) -> settingsBuilder.put(s.getKey(),
environmentSubstitute(s.getValue())));
RestHighLevelClient rclient = null;
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(settingsBuilder.get("es.user"),
settingsBuilder.get("es.password")));
for (Server server : meta.getServers()) {
rclient = new RestHighLevelClient(RestClient.builder(
new HttpHost(server.getAddress(), Integer.valueOf(server
.getPort()), "http")).setHttpClientConfigCallback(
new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider);
}
}));
}
client = rclient;
}
3.3.3、数据流处理
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
throws KettleException {
Object[] rowData = getRow();
if (rowData == null) {
if (currentRequest != null && currentRequest.numberOfActions() > 0) {
processBatch(false);
}
setOutputDone();
return false;
}
if (first) {
first = false;
setupData();
requestsBuffer = new ArrayList<IndexRequest>(this.batchSize);
initFieldIndexes();
}
try {
data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;
return indexRow(data.inputRowMeta, rowData) || !stopOnError;
} catch (KettleStepException e) {
throw e;
} catch (Exception e) {
rejectAllRows(e.getLocalizedMessage());
String msg = BaseMessages.getString(PKG,
"ElasticSearchBulk.Log.Exception", e.getLocalizedMessage());
logError(msg);
throw new KettleStepException(msg, e);
}
}
3.3.4、数据批次处理
private boolean processBatch(boolean makeNew) throws KettleStepException {
BulkResponse response = null;
// ActionFuture<BulkResponse> actionFuture = currentRequest.execute();
try {
response = client.bulk(currentRequest, RequestOptions.DEFAULT);
} catch (IOException e1) {
rejectAllRows(e1.getLocalizedMessage());
String msg = BaseMessages
.getString(PKG, "ElasticSearchBulk.Log.Exception",
e1.getLocalizedMessage());
logError(msg);
throw new KettleStepException(msg, e1);
}
boolean responseOk = false;
if (response != null) {
responseOk = handleResponse(response);
requestsBuffer.clear();
} else { // have to assume all failed
numberOfErrors += currentRequest.numberOfActions();
setErrors(numberOfErrors);
}
if (makeNew) {
// currentRequest = client.prepareBulk();
try {
client.bulk(currentRequest, RequestOptions.DEFAULT);
} catch (IOException e1) {
rejectAllRows(e1.getLocalizedMessage());
String msg = BaseMessages.getString(PKG,
"ElasticSearchBulk.Log.Exception",
e1.getLocalizedMessage());
logError(msg);
throw new KettleStepException(msg, e1);
}
data.nextBufferRowIdx = 0;
data.inputRowBuffer = new Object[batchSize][];
} else {
currentRequest = null;
data.inputRowBuffer = null;
}
return responseOk;
}
无论服务端是那个版本的Elasticsearch集群,客户端必须具有与服务端群集中的节点相同的主要版本(例如6.x或7.x)
四、ElasticSearch Bulk Insert使用说明
4.1、General参数
①Index:动态索引字段,索引前缀+动态日期
②Type:默认_doc
③Test Index:在线检查索引是否存在
④Batch Size:批次大小
⑤Stop on error:遇到错误是否终止
⑥Batch Timeout:批次写入超时时间,单位秒
⑦Id Field:即文档ID,doc_id
⑧Overwrite if exists:存在是否覆盖
⑨Output Rows:输出行
4.2、Servers参数
①Address:Elasticsearch集群地址列表
②Port:匹配端口号
4.3、Fields输出字段
①Name:数据流字段
②Target Name:Elasticsearch集群对应index,目标mapping字段
4.4、Settings参数
①cluster.name:集群名称
②es.user:es鉴权认证用户名,自定义参数名
③es.password:es鉴权认证密码,自定义参数名
五、总结
如果你需要源码或者了解更多自定义插件及集成方式,抑或有开发过程或者使用过程中的任何疑问或建议,请关注小编"游走在数据之间"。