数据库架构之【ELK(Elasticsearch+Logstash+Kibana)】OLAP 集群方案

ELK 是三个开源项目的首字母缩写,这三个项目分别是:Elasticsearch、Logstash 和 Kibana。它是一套以 Elasticsearch 为核心的 OLAP 集成解决方案。

本方案基于CentOS8系统设计,建议在RedHat/CentOS系统中使用。方案使用服务器及网络资源较多,建议在实施前做好规划工作,有利于部署工作顺利、有序进行。

目录

1.前言

2.集群部署拓扑图

3.Elasticsearch 单点安装和配置
3.1.安装和配置
3.2.安全性扩展
3.3.中文分词扩展

4.Elasticsearch 集群化
4.1.Elasticsearch 集群的概念
4.2.Elasticsearch 集群的示例

5.Kibana 安装和配置

6.Logstash 安装和作业部署

7.Java 开发集成
7.1.ES 编程语言
7.2.ES + Jest 可复用类库
7.3.ELK + Jest 最佳集成方案


1.前言

1、"集群"指的是 Elasticsearch 组成的集群,实现负载均衡,故障转移,实时热备和读写分离。但与 Logstash 和 Kibana 无关。

2、核心组件简介

1)Elasticsearch: 一个搜索和分析引擎,提供搜集、分析、存储数据三大功能。具有分布式,零配置,自动发现,索引自动分片,索引副本机制,Restful风格接口,多数据源,自动搜索负载等特点。Elasticsearch 能够对PB级海量数据进行近实时的处理(从写入到查询的延时≈1秒),且能够支持数百台服务器节点的分布式集群架构。

2)IKAnalyzer :一个开源的,基于Java语言开发的轻量级的中文分词工具包。

2)Logstash: 一个服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到 Elasticsearch 中。Logstash 通过部署JOB来完成工作,JOB工作流程如下:

Logstash JOB 工作流程

3)Kibana: 一个 Elasticsearch 的可视化管理用户接口,使用图形和图表对数据进行可视化。

3、ELK 适用于以下应用场景:

1)普通用于具有全文检索、海量数据检索类型的应用或功能,作为OLAP数据库支撑业务;

2)可以用于数据分析类型的应用或功能,且能够满足较高的实时性要求,作为OLAP数据库支撑业务;

3)可以用于具有高并发读写,弱ACID管理,弱安全性控制,小型文档型数据特点的应用或功能,作为数据库支撑业务。

补充知识:ACID是数据库事务的4个特征,分别是原子性、一致性、隔离性和持久性。


2.集群部署拓扑图

ELK集群部署拓扑图

网络资源规划:

1、ES Node 1 : OLAP 数据库服务器

1)操作系统:CentOS8;

2)IP地址和端口号:192.168.216.128:9200;

3)主机名:ES-1;

4)数据库:Elasticsearch;

5)角色:OLAP 数据库集群节点。

2、ES Node 2 :OLAP 数据库服务器

1)操作系统:CentOS8;

2)IP地址和端口号:192.168.216.129:9200;

3)主机名:ES-2;

4)数据库:Elasticsearch;

5)角色:OLAP 数据库集群节点。

3、ES Node 3 :OLAP 数据库服务器

1)操作系统:CentOS8;

2)IP地址和端口号:192.168.216.130:9200;

3)主机名:ES-3;

4)数据库:Elasticsearch;

5)角色:OLAP 数据库集群节点。

4、Kinaba: 可视化管理客户端

1)操作系统:CentOS8;

2)IP地址和端口号:192.168.216.131:5601;

3)主机名:ESVM;

4)中间件:Kibana;

5)角色:OLAP 数据库可视化管理器。

5、Logstash : 数据 ETL 服务器

1)操作系统:CentOS8;

2)IP地址:192.168.216.132;

3)主机名:ETL;

4)中间件:Logstash;

5)角色:数据采集器。

6、外部数据源: 作为 OLAP 数据分析的外部数据来源数据库,如:PostgreSQL、MySQL 等 RDBMS 数据库 。IP地址:192.168.216.133


3.Elasticsearch 单点安装和配置

3.1.安装和配置

以"ES Node 1"节点为例:

1、打开 Elasticsearch 官方网站下载页面【https://www.elastic.co/cn/downloads/elasticsearch】,下载 Elasticsearch 的编译程序 tar 包到用户主目录中。

Elasticsearch 官方下载页面

2、解压缩编译程序 tar 包到"/usr/local"目录中。

[centos@ES-1 ~]$ sudo tar zxvf elasticsearch-7.6.2-linux-x86_64.tar.gz -C /usr/local
[centos@ES-1 ~]$ ll /usr/local
drwxr-xr-x. 9 root root 155 3月  26 14:36 elasticsearch-7.6.2

3、创建 Elastcsearch 的数据存储目录和日志存储目录。

[centos@ES-1 ~]$ sudo mkdir -p /data/elasticsearch/data
[centos@ES-1 ~]$ sudo mkdir -p /data/elasticsearch/logs

4、创建 ELK 管理用户和组,并设置为程序安装目录、数据存储目录的拥有者。

[centos@ES-1 ~ ]$ sudo id elastic
id: “elastic”:无此用户
[centos@ES-1 ~ ]$ sudo groupadd elastic
[centos@ES-1 ~ ]$ sudo useradd -g elastic -s /bin/false elastic
[centos@ES-1 ~ ]$ sudo chown -R elastic:elastic /usr/local/elasticsearch-7.6.2
[centos@ES-1 ~ ]$ sudo chown -R elastic:elastic /data/elasticsearch

ELK组件的管理用户和组(如:"elastic")可以共享使用,创建用户和组之前首先查询一下账户是否存在。如果账户和组已经存在,可能是因为在安装其他ELK组件时已创建,这种情况下不必重复创建用户和组,只需要更改程序安装目录、数据存储目录的拥有者即可。

5、设置 Elastcsearch 的配置文件参数。

使用文本编辑器打开配置文件:

[centos@ES-1 ~]$ sudo gedit /usr/local/elasticsearch-7.6.2/config/elasticsearch.yml

修改或验证文件中的以下参数并保存:

# --常用设置项--

node.name: es-node-1
# 设置当前ES节点的名称,默认随机指定一个预置列表中名字。

path.data: /data/elasticsearch/data
# 设置索引数据的存储路径,默认是ES根目录下的data文件夹。可以设置多个存储路径,用逗号隔开,例:
# path.data: /path/to/data1,/path/to/data2

path.logs: /data/elasticsearch/logs
# 设置日志文件的存储路径,默认是ES根目录下的logs文件夹 。

network.host: 0.0.0.0
# 设置ES服务的监听网络地址(IPv4 或 IPv6),默认是示例地址。

http.port: 9200
# 设置ES服务的HTTP端口,默认为9200。

transport.tcp.port: 9300
# 设置节点之间交互的TCP端口,默认是9300。

http.cors.enabled: true
http.cors.allow-origin: "*"
# 设置允许HTTP跨域访问

discovery.seed_hosts: ["127.0.0.1", "[::1]"]
# 设置集群节点发现清单,默认为IPv4和IPv6回路地址。

cluster.initial_master_nodes: ["es-node-1"]
# 设置集群初始化主要节点清单,默认是node-1。


# --其他可用设置项--

# cluster.name: elasticsearch-cluster
# 设置的集群名称,默认是elasticsearch。ES通过广播方式自动发现同一网段其他节点,若同一网段下有多个集群,则通过集群名称区分不同的集群。

# node.master: true
# 设置当前节点是否有资格被选举成为主要节点,默认是true。ES默认集群中的第一个节点为主要节点,如果这个节点故障就会重新选举主要节点。

# node.data: true
# 设置当前节点是否存储索引数据,默认为true。

# path.conf: /path/to/conf
# 设置配置文件的存储路径,默认是ES根目录下的config文件夹。

# path.work: /path/to/work
# 设置临时文件的存储路径,默认是ES根目录下的work文件夹。

# path.plugins: /path/to/plugins
# 设置插件的存放路径,默认是ES根目录下的plugins文件夹, 插件在ES里面普遍使用,用来增强原系统核心功能。

# bootstrap.mlockall: true
# 设置为true来锁住JVM内存。

# network.bind_host: 0.0.0.0
# 设置监听的ip地址,可以是IPv4或IPv6的,默认为0.0.0.0。可以绑定这台机器的任何一个IP地址。

# network.publish_host: 0.0.0.0
# 设置其它节点和该节点交互的IP地址,如果不设置它会自动判断,值必须是个真实的IP地址。

# transport.tcp.compress: true
# 设置是否压缩TCP传输时的数据,默认为false。

# http.max_content_length: 100mb
# 设置HTTP协议请求内容的最大容量,默认100mb。

# http.enabled: true
# 设置是否使用HTTP协议对外提供服务,默认为true。

# gateway.type: local
# 设置gateway的类型,默认为local即为本地文件系统。可以设置为本地文件系统,分布式文件系统,hadoop的HDFS,和amazon的s3服务器等。

# gateway.recover_after_nodes: 1
# 设置集群中N个节点启动时进行数据恢复,默认为1。

# gateway.recover_after_time: 5m
# 设置初始化数据恢复进程的超时时间,默认是5分钟。

# gateway.expected_nodes: 2
# 设置这个集群中节点的数量,默认为2,一旦这N个节点启动,就会立即进行数据恢复。

# cluster.routing.allocation.node_initial_primaries_recoveries: 4
# 初始化数据恢复时,并发恢复线程的个数,默认为4。

# cluster.routing.allocation.node_concurrent_recoveries: 2
# 添加删除节点或负载均衡时并发恢复线程的个数,默认为4。

# indices.recovery.max_size_per_sec: 0
# 设置数据恢复时限制的带宽,如入100mb,默认为0,即无限制。

# indices.recovery.concurrent_streams: 5
# 设置这个参数来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5。

# discovery.zen.ping.timeout: 3s
# 设置集群中自动发现其它节点时ping连接超时时间,默认为3秒,对于比较差的网络环境可以高点的值来防止自动发现时出错。

# discovery.zen.ping.multicast.enabled: true
# 设置是否打开多播发现节点,默认是true。

# discovery.zen.ping.unicast.hosts: ["127.0.0.1", "[::1]"]
# 设置集群中主要节点的初始列表,可以通过这些节点来自动发现新加入集群的节点。

# discovery.zen.minimum_master_nodes: 1
# 设置集群有主要节点资格的数量,默认为1。对于大的集群来说,可以设置大一点的值(2-4)。

# xpack.security.enabled: true
# xpack.security.transport.ssl.enabled: true
# xpack.security.transport.ssl.verification_mode: certificate
# xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
# xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
# 启用 X-Pack 安全认证模块,启用前需要导出证书文件,如:elastic-certificates.p12。

6、设置 Elasticsearch 运行的Linux系统参数。

1)在"/etc/sysctl.conf"文件中设置内核参数。

使用文本编辑器创建配置文件:

[centos@ES-1 ~ ]$ sudo gedit /etc/sysctl.conf

在文件中追加内容并保存如下:

vm.max_map_count=655360

2)在"/etc/security/limits.conf"文件中设置用户 SHELL 参数。
使用文本编辑器创建配置文件:

[centos@ES-1 ~ ]$ sudo gedit /etc/security/limits.conf

在文件中追加内容并保存如下:

elastic soft nofile 65536
elastic hard nofile 65536

3)重新启动操作系统。

[centos@ES-1 ~ ]$ sudo reboot

7、配置 Elasticsearch 服务开机自启动。

使用文本编辑器创建配置文件:

[centos@ES-1 ~ ]$ sudo gedit /usr/lib/systemd/system/elasticsearch-server.service

编写文件内容并保存如下:

[Unit]
Description=Elasticsearch Server
After=syslog.target network.target

[Service]
User=elastic
Group=elastic

ExecStart=/usr/local/elasticsearch-7.6.2/bin/elasticsearch
ExecStop=/bin/kill -HUP $MAINPID
ExecReload=/bin/kill -HUP $MAINPID

KillMode=mixed
KillSignal=SIGINT

LimitNOFILE=100000
LimitNPROC=100000

[Install]
WantedBy=multi-user.target

设置开机启动:

[centos@ES-1 ~]$ sudo systemctl daemon-reload
[centos@ES-1 ~]$ sudo systemctl enable elasticsearch-server.service

8、启动 Elasticsearch 服务。

[centos@ES-1 ~]$ sudo systemctl start elasticsearch-server.service

9、设置防火墙端口(CentOS8默认安装firewall防火墙),允许"9200","9300"端口(Elasticsearch 默认端口)访问服务器。

[centos@ES-1 ~]$ sudo firewall-cmd --zone=public --add-port=9200/tcp --permanent
[centos@ES-1 ~]$ sudo firewall-cmd --zone=public --add-port=9300/tcp --permanent
[centos@ES-1 ~]$ sudo firewall-cmd --reload

10、验证 Elasticsearch 服务正常运行。使用浏览器客户端访问 Elasticsearch 的 Http 服务接口进行测试,输入:http://<IP>:<PORT>

Elastisearch 服务正常运行反馈消息

11、Elasticsearch 运维管理

1)启动 Elasticsearch 服务(任选一种方式)

[centos@ES-1 ~]$ sudo systemctl start elasticsearch-server.service

或者

[centos@ES-1 ~]$ sudo -u elastic /usr/local/elasticsearch-7.6.2/bin/elasticsearch

2)停止 Elasticsearch 服务(任选一种方式)

[centos@ES-1 ~]$ sudo systemctl stop elasticsearch-server.service

或者

[centos@ES-1 ~]$ sudo ps -ef | grep Elasticsearch
elastic    3377      1  2 13:15 ?        00:00:37 /usr/local/elasticsearch-7.6.2/jdk/bin/java ...
[centos@ES-1 ~]$ sudo kill -9 3377

3)重启 Elasticsearch 服务

[centos@ES-1 ~]$ sudo systemctl restart elasticsearch-server.service

或者

[centos@ES-1 ~]$ sudo ps -ef | grep Elasticsearch
elastic    3377      1  2 13:15 ?        00:00:37 /usr/local/elasticsearch-7.6.2/jdk/bin/java...
[centos@ES-1 ~]$ sudo kill -9 3377
[centos@ES-1 ~]$ sudo -u elastic /usr/local/elasticsearch-7.6.2/bin/elasticsearch

4)查看 Elasticseach 服务状态

[centos@ES-1 ~]$ sudo systemctl status elasticsearch-server.service

或者

[centos@ES-1 ~]$ sudo netstat -ntap | grep -E "9200|9300"
tcp6       0      0 :::9200                 :::*                    LISTEN      3377/java           
tcp6       0      0 :::9300                 :::*                    LISTEN      3377/java  

5)开启 Elasticsearch 服务开机自启动

[centos@ES-1 ~]$ sudo systemctl enable elasticsearch-server.service

6)禁用 Elasticsearch 服务开机自启动

[centos@ES-1 ~]$ sudo systemctl disable elasticsearch-server.service

在多个实例并行的情况下,全部实例共享使用管理用户和组、Linux内核配置,但每个实例独立安装(包括独立的程序安装目录和数据存储目录)、独立端口、独立启动服务。

注意:其他"ES Node"节点上全部需要按照以上步骤配置。


3.2.安全性扩展

Elasticsearch 集成了 X-Pack 作为安全性组件,直接开启使用即可。

首先完成 Elasticsearch 的安装和配置。以"ES Node 1"节点为例:

1、导出 Elasticsearch X-Pack 证书文件。导出文件命令需要设置证书口令为空。

[centos@ES-1 ~]$ sudo -u elastic /usr/local/elasticsearch-7.6.2/bin/elasticsearch-certutil cert -out /usr/local/elasticsearch-7.6.2/config/elastic-certificates.p12 -pass ""

2、修改 Elastcsearch 的配置文件参数。

使用文本编辑器打开配置文件:

[centos@ES-1 ~]$ sudo gedit /usr/local/elasticsearch-7.6.2/config/elasticsearch.yml

追加或验证文件中的以下参数并保存:

xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12

3、重新启动 Elasticsearch 服务。

[centos@ES-1 ~]$ sudo systemctl restart elasticsearch-server.service

4、设置 Elasticsearch 服务内置账号的口令。

[centos@ES-1 ~]$ sudo -u elastic  /usr/local/elasticsearch-7.6.2/bin/elasticsearch-setup-passwords interactive

Initiating the setup of passwords for reserved users elastic,apm_system,kibana,logstash_system,beats_system,remote_monitoring_user.
You will be prompted to enter passwords as the process progresses.
Please confirm that you would like to continue [y/N]y

Enter password for [elastic]: 
Reenter password for [elastic]: 
Enter password for [apm_system]: 
Reenter password for [apm_system]: 
Enter password for [kibana]: 
Reenter password for [kibana]: 
Enter password for [logstash_system]: 
Reenter password for [logstash_system]: 
Enter password for [beats_system]: 
Reenter password for [beats_system]: 
Enter password for [remote_monitoring_user]: 
Reenter password for [remote_monitoring_user]: 

Changed password for user [apm_system]
Changed password for user [kibana]
Changed password for user [logstash_system]
Changed password for user [beats_system]
Changed password for user [remote_monitoring_user]
Changed password for user [elastic]

5、验证 X-Pack 认证。使用浏览器客户端访问 Elasticsearch 的 Http 服务接口进行测试,输入:http://<IP>:<PORT>

X-Pack 认证界面

注意:其他 "ES Node" 节点上不需要执行此步骤。但是需要将此服务器上已经创建的 "elastic-certificates.p12" 文件和 "elasticsearch.keystore" 文件拷贝到其他"ES Node"节点的同一位置上,一致使用。即:集群中所有节点的安全认证文件是一致的。


3.3.中文分词扩展

在中文环境下使用 Elasticsearch 一般是需要扩展中文分词器,建议使用 IkAnalyzer 。IKAnalyzer 是一个开源的,基于 Java 语言开发的轻量级的中文分词工具包。

首先完成 Elasticsearch 的安装和配置。以"ES Node 1"节点为例:

1、从【https://github.com/medcl/elasticsearch-analysis-ik/releases】下载 IKAnalyzer 的编译程序 zip 包到用户主目录中。使用的 IKAnalyzer 分词器的版本应与 Elasticsearch 的版本保持一致。

IKAnalyzer 下载页面

2、解压缩编译程序 zip 包到 Elasticsearch 程序安装目录下的"plugins"目录下,并设置 ELK 管理用户和组为拥有者。

[centos@ES-1 ~]$ sudo unzip elasticsearch-analysis-ik-7.6.2.zip -d /usr/local/elasticsearch-7.6.2/plugins/ik
[centos@ES-1 ~]$ sudo chown -R elastic:elastic /usr/local/elasticsearch-7.6.2/plugins/ik

3、重新启动 Elasticsearch 服务。

[centos@ES-1 ~]$ sudo systemctl restart elasticsearch-server.service

4、验证 IkAnalyzer 插件集成。使用浏览器客户端访问 Elasticsearch 的 Http 服务接口进行测试,输入:http://<IP>:<PORT>/_cat/plugins

IkAnalyzer 正确集成反馈消息

注意:其他"ES Node"节点上全部需要按照以上步骤配置。


4.Elasticsearch 集群化

4.1.Elasticsearch 集群的概念

1、Elasticsearch 集群的组成

1)节点(Node)和 集群(Cluster):节点是单个 Elastcsearch 服务,一般部署在单独的服务器上。如果这些单个 Elasticsearch 服务的配置文件(elasticsearch.yml 文件) 中具有相同的"cluster.name"设置值,那就自动发现这些节点并组成一个集群,集群中允许有1个或者大于1个的单数节点组成。

2)文档(Document)和 索引(Index):文档是一组由 JSON 结构组成的结构化数据,它由若干属性和值组成,而索引是存储文档的库。可以将索引比作 RDBMS 中仅有一个"表"的数据库,文档是"表"中的"行",而文档属性是"行"中的"列"。

3)shard(分片):Elasticsearch 将一个索引切分为多个分片,在集群的节点中分布存储,构成分布式搜索机制。因此 ES 具有横向扩展的能力,可以存储大量数据并且让搜索和分析等操作分布到多台服务器上去执行,提升吞吐量和性能。分片数量在建立索引时一次设置,不能修改,默认5个

4)shard replica(分片副本):Elasticsearch 允许每个分片创建多个副本。分片副本可以在多个节点中冗余数据,以保障在节点故障时数据不被丢失,同时可以提升搜索操作的吞吐量和性能。分片副本数量可以随时修改,默认1个

2、Elasticsearch 集群节点的类型

1)主节点:主节点负责创建索引、删除索引、分配分片、追踪集群中的节点状态等工作。Elasticsearch 中的主节点的工作量相对较轻,用户的请求可以发往集群中任何一个节点,由该节点负责分发和返回结果,而不需要经过主节点转发。而主节点是由候选主节点通过 ZenDiscovery 机制选举出来的。

2)候选主节点:在 Elasticsearch 集群初始化或者主节点宕机的情况下,从候选主节点中选举其中一个作为主节点。候选主节点的 Elasticsearch 服务的配置文件(elasticsearch.yml 文件)设置项特点:

node.master: true
# 设置当前节点是否有资格被选举成为主要节点,默认是true。ES默认集群中的第一个节点为主要节点,如果这个节点故障就会重新选举主要节点。

node.data: false
# 设置当前节点是否存储索引数据,默认为true。

3)数据节点:数据节点负责数据的存储和相关具体操作,比如CRUD、搜索、聚合。所以,数据节点对机器配置要求比较高,首先需要有足够的磁盘空间来存储数据,其次数据操作对系统CPU、Memory和IO的性能消耗都很大。数据节点的 Elasticsearch 服务的配置文件(elasticsearch.yml 文件)设置项特点:

node.master: false
# 设置当前节点是否有资格被选举成为主要节点,默认是true。ES默认集群中的第一个节点为主要节点,如果这个节点故障就会重新选举主要节点。

node.data: true
# 设置当前节点是否存储索引数据,默认为true。

4)候选主节点+数据节点:一个节点可以同时成为候选主节点和数据节点,主需要在候选主节点配置的基础上开启数据节点即可。候选主节点+数据节点的 Elasticsearch 服务的配置文件(elasticsearch.yml 文件)设置项特点:

node.master: true
# 设置当前节点是否有资格被选举成为主要节点,默认是true。ES默认集群中的第一个节点为主要节点,如果这个节点故障就会重新选举主要节点。

node.data: true
# 设置当前节点是否存储索引数据,默认为true。

5)客户端节点:客户端节点就是既不做候选主节点也不做数据节点的节点,只负责请求的分发、汇总。这是所有节点的共性能力,单独增加这样的节点一般是为了负载均衡。数据节点的 Elasticsearch 服务的配置文件(elasticsearch.yml 文件)设置项特点:

node.master: false
# 设置当前节点是否有资格被选举成为主要节点,默认是true。ES默认集群中的第一个节点为主要节点,如果这个节点故障就会重新选举主要节点。

node.data: false
# 设置当前节点是否存储索引数据,默认为true。

4.2.Elasticsearch 集群的示例

首先完成 Elasticsearch 的安装和配置。假设按照本文规划的集群拓扑图,所有集群节点都是"候选主节点+数据节点"的类型,以"ES Node 1"节点为例:

1、设置 Elastcsearch 的配置文件参数。

使用文本编辑器打开配置文件:

[centos@ES-1 ~]$ sudo gedit /usr/local/elasticsearch-7.6.2/config/elasticsearch.yml

修改或验证文件中的以下参数并保存:

# -- 必须一致的配置 --

cluster.name: elasticsearch-cluster
# 设置的集群名称,同一集群的节点应该设置相同的集群名称。

cluster.initial_master_nodes: ["es-node-1","es-node-2"]
# 设置集群初始化主节点清单。本例中将【es-node-1】和【es-node-2】节点设置为初始化主节点。

discovery.send_hosts: ["192.168.216.128:9300", "192.168.216.129:9300"]
# 设置集群节点发现清单,一般是候选主节点IP地址,不写端口号时默认9300端口。

discovery.zen.minimum_master_nodes: 2
# 设置集群候选主节点的最少数量。

# -- 各节点的个性化配置 --

node.name: es-node-1
# 设置当前ES节点的名称,每个节点应设置不同的名称。

node.master: true
# 设置当前节点是候选主节点。

node.data: true
# 设置当前节点是数据节点。

path.data: /data/elasticsearch/data
# 设置索引数据的存储路径,默认是ES根目录下的data文件夹。可以设置多个存储路径,用逗号隔开,例:
# path.data: /path/to/data1,/path/to/data2

path.logs: /data/elasticsearch/logs
# 设置日志文件的存储路径,默认是ES根目录下的logs文件夹 。

network.host: 0.0.0.0
# 设置ES服务的监听网络地址(IPv4 或 IPv6),默认是示例地址。

http.port: 9200
# 设置ES服务的HTTP端口,默认为9200。

transport.tcp.port: 9300
# 设置节点之间交互的TCP端口,默认是9300。

http.cors.enabled: true
http.cors.allow-origin: "*"
# 设置允许HTTP跨域访问

# xpack.security.enabled: true
# xpack.security.transport.ssl.enabled: true
# xpack.security.transport.ssl.verification_mode: certificate
# xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
# xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
# 启用 X-Pack 安全认证模块,集群中所有节点的安全认证文件是一致的。
# 只需要在其中一个节点中创建认证文件(elastic-certificates.p12 文件和 elasticsearch.keystore 文件),其他节点只需要拷贝这两个文件即可,注意要存放在一致的目录下。

2、认证文件一致化设置。

假设在 "ES Node 1" 上已经创建安全认证文件 "elastic-certificates.p12" 和 "elasticsearch.keystore" ,这些文件存放在 Elasticsearch 程序安装目录下的 "config" 目录中(本例为:/usr/local/elasticsearch-7.6.2/config 目录)。

将两个文件分别拷贝至其他 "ES Node" 节点上的 Elasticsearch 程序安装目录下的 "config" 目录中(本例为:/usr/local/elasticsearch-7.6.2/config 目录)。

注意:除上述操作以外,无需进行任何有关安全性扩展的配置。

3、重启启动集群。

首先启动"ES Node 1"节点,然后依次启动其他"ES Node"节点。

[centos@ES-1 ~]$ sudo systemctl restart elasticsearch-server.service

[centos@ES-2 ~]$ sudo systemctl restart elasticsearch-server.service

[centos@ES-3 ~]$ sudo systemctl restart elasticsearch-server.service

4、验证集群。使用浏览器客户端访问 Elasticsearch 的 Http 服务接口进行测试,输入:http://<IP>:<PORT>/_cluster/health

Elasticsearch 集群验证页面

5.Kibana 安装和配置

1、打开 Kibana 官方网站下载页面【https://www.elastic.co/cn/downloads/kibana】,下载 Kibana 的编译程序 tar 包到用户主目录中。

Kibana 官方下载页面

2、解压缩编译程序 tar 包到"/usr/local"目录中。

[centos@ESVM ~]$ sudo tar zxvf kibana-7.6.2-linux-x86_64.tar.gz -C /usr/local
[centos@ESVM ~]$ ll /usr/local
drwxr-xr-x. 13 root    root    266 4月  20 15:25 kibana-7.6.2-linux-x86_64

3、创建 ELK 管理用户和组,并设置为程序安装目录的拥有者。

[centos@ESVM ~ ]$ sudo id elastic
id: “elastic”:无此用户
[centos@ESVM ~ ]$ sudo groupadd elastic
[centos@ESVM ~ ]$ sudo useradd -g elastic -s /bin/false elastic
[centos@ESVM ~ ]$ sudo chown -R elastic:elastic /usr/local/kibana-7.6.2-linux-x86_64

ELK组件的管理用户和组(如:"elastic")可以共享使用,创建用户和组之前首先查询一下账户是否存在。如果账户和组已经存在,可能是因为在安装其他ELK组件时已创建,这种情况下不必重复创建用户和组,只需要更改程序安装目录、数据存储目录的拥有者即可。

4、设置 Kibana 的配置文件参数。

使用文本编辑器打开配置文件:

[centos@ESVM ~]$ sudo gedit /usr/local/kibana-7.6.2-linux-x86_64/config/kibana.yml

修改或验证文件中的以下参数并保存:

server.port: 5601
# 服务监听端口。

server.host: "192.168.216.131"
# 服务监听地址。

server.name: "ESVM Kibana"
# 服务的显示名称。

elasticsearch.hosts: ["http://192.168.216.128:9200","http://192.168.216.129:9200","http://192.168.216.130:9200"]
# ElasticSearch 集群 URLs 集合。

kibana.index: ".kibana"
# Kibana 在 ElasticSearch 中存储搜索、可视化和仪表板数据的索引名字。

elasticsearch.username: "elastic"
# Kibana 访问 ElasticSearch 的账号,"elastic" 账号是 Elasticsearch 的预置用户。

elasticsearch.password: "password"
# Kibana 访问 ElasticSearch 账号的口令。

i18n.locale: "zh-CN"
# 本地化区域语言设置,支持英文和中文,默认是中文。

5、配置 Kibana 服务开机自启动。

使用文本编辑器创建配置文件:

[centos@ESVM ~ ]$ sudo gedit /usr/lib/systemd/system/kibana-server.service

编写文件内容并保存如下:

[Unit]
Description=Kibana Server
After=syslog.target network.target

[Service]
User=elastic
Group=elastic

ExecStart=/usr/local/kibana-7.6.2-linux-x86_64/bin/kibana
ExecStop=/bin/kill -HUP $MAINPID
ExecReload=/bin/kill -HUP $MAINPID

KillMode=mixed
KillSignal=SIGINT

LimitNOFILE=100000
LimitNPROC=100000

[Install]
WantedBy=multi-user.target

设置开机启动:

[centos@ESVM ~]$ sudo systemctl daemon-reload
[centos@ESVM ~]$ sudo systemctl enable kibana-server.service

6、启动 Kibana 服务。

[centos@ESVM ~]$ sudo systemctl start kibana-server.service

7、设置防火墙端口(CentOS8默认安装firewall防火墙),允许"5601"端口(Kibana 默认端口)访问服务器。

[centos@ESVM ~]$ sudo firewall-cmd --zone=public --add-port=5601/tcp --permanent
[centos@ESVM ~]$ sudo firewall-cmd --reload

8、验证 Kibana 服务正常运行。使用浏览器客户端访问 Kibana 的 Http 服务接口进行测试,输入:http://<IP>:<PORT>

Kibana 登录页面
Kibana 主页面

9、Kibana 运维管理

1)启动 Kibana 服务(任选一种方式)

[centos@ESVM ~]$ sudo systemctl start kibana-server.service

或者

[centos@ESVM ~]$ sudo -u elastic /usr/local/kibana-7.6.2-linux-x86_64/bin/kibana

2)停止 Kibana 服务(任选一种方式)

[centos@ESVM ~]$ sudo systemctl stop kibana-server.service

或者

[centos@ESVM ~]$ sudo ps -ef | grep kibana
elastic   12268      1  1 4月20 ?       00:12:00 /usr/local/kibana-7.6.2-linux-x86_64/bin/..
[centos@ESVM ~]$ sudo kill -9 12268

3)重启 Kibana 服务

[centos@ESVM ~]$ sudo systemctl restart kibana-server.service

或者

[centos@ESVM ~]$ sudo ps -ef | grep kibana
elastic   12268      1  1 4月20 ?       00:12:00 /usr/local/kibana-7.6.2-linux-x86_64/bin/..
[centos@ESVM ~]$ sudo kill -9 12268
[centos@ESVM ~]$ sudo -u elastic /usr/local/kibana-7.6.2-linux-x86_64/bin/kibana

4)查看 Kibana 服务状态

[centos@ESVM ~]$ sudo systemctl status kibana-server.service

或者

[centos@ESVM ~]$ sudo netstat -ntap | grep 5601
tcp        0      0 192.168.216.131:5601    0.0.0.0:*               LISTEN      12268/node

5)启动 Kibana 服务开机自启动

[centos@ESVM ~]$ sudo systemctl enable kibana-server.service

6)禁用 Kibana 服务开机自启动

[centos@ESVM ~]$ sudo systemctl disable kibana-server.service

6.Logstash 安装和作业部署

1、安装 OpenJDK 1.8 或 OracleJDK 1.8。

1)使用 YUM 源安装OpenJDK 1.8。

[centos@ETL ~]$ sudo dnf install java-1.8.0-openjdk

2)验证Java运行环境。

[centos@ETL ~]$ java -vserion
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-b08)
OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)

2、打开 Logstash 官方网站下载页面【https://www.elastic.co/cn/downloads/logstash】,下载 Logstash 的编译程序 tar 包到用户主目录中。

Logstash 官方下载页面

3、解压缩编译程序 tar 包到"/usr/local"目录中。

[centos@ETL ~]$ sudo tar zxvf logstash-7.6.2.tar.gz -C /usr/local
[centos@ETL ~]$ ll /usr/local
drwxr-xr-x. 12     631     503 255 3月  26 17:42 logstash-7.6.2

4、创建 Logstash 的数据、日志、作业配置存储目录。

[centos@ETL ~]$ sudo mkdir -p /data/logstash/data
[centos@ETL ~]$ sudo mkdir -p /data/logstash/logs
[centos@ETL ~]$ sudo mkdir -p /data/logstash/config

5、创建 ELK 管理用户和组,并设置为程序安装目录、数据存储目录的拥有者。

[centos@ETL ~ ]$ sudo id elastic
id: “elastic”:无此用户
[centos@ETL ~ ]$ sudo groupadd elastic
[centos@ETL ~ ]$ sudo useradd -g elastic -s /bin/false elastic
[centos@ETL ~ ]$ sudo chown -R elastic:elastic /usr/local/logstash-7.6.2
[centos@ETL ~ ]$ sudo chown -R elastic:elastic /data/logstash

ELK组件的管理用户和组(如:"elastic")可以共享使用,创建用户和组之前首先查询一下账户是否存在。如果账户和组已经存在,可能是因为在安装其他ELK组件时已创建,这种情况下不必重复创建用户和组,只需要更改程序安装目录、数据存储目录的拥有者即可。

6、设置 Logstash 的配置文件参数。

使用文本编辑器打开配置文件:

[centos@ETL ~]$ sudo gedit /usr/local/logstash-7.6.2/config/logstash.yml

修改或验证文件中的以下参数并保存:

# --常用设置项--

path.data: /data/logstash/data
# 插件数据持久化目录,默认为程序安装目录下的"data"目录。

path.logs: /data/logstash/logs
# 设置日志文件目录,默认输出到控制台。

path.config: /data/logstash/config
# 设置管道配置文件目录。

config.reload.automatic: true
# 开启管道配置文件自动加载,默认为false。

config.reload.interval: 3s
# 设置管道配置文件检查更新的时间。

# --其他可用设置项--

# node.name: ETL Logstash
# 节点名称,默认为主机名称。

# pipeline.workers: 2
# 输出通道的工作workers数据量(提升输出效率)默认为CPU核数。

# pipeline.batch.size: 125
# 每批次输入数据的条数,默认125条。

# pipeline.batch.delay: 50
# 每批次延时等待的时间(单位毫秒),默认50毫秒。

# http.host: "127.0.0.1"
# 用户指标收集REST服务绑定主机地址,默认为"127.0.0.1"。

# http.port: 9600-9700
# 用户指标收集REST服务绑定主机端口,默认为9600-9700。

# log.level: info
# 日志输出级别,如果config.debug开启,这里一定要是debug日志 ,默认为 info。

# path.plugins: []
# 自定义插件目录

7、配置 ETL 作业。

以从 PostgreSQL 数据库中抽取数据,并写入 Elasticsearch 中为例:

1)下载 PostgreSQL JDBC 的 jar 包到指定位置。打开 PostgreSQL JDBC 官方网站下载页面【https://jdbc.postgresql.org/download.html】,下载 PostgreSQL JDBC 的 jar 包到用户主目录中。**

PostgreSQL JDBC 的 jar 包下载页面

2)将 PostgreSQL JDBC 的 jar 包拷贝到 Logstash 程序目录中的指定位置:

[centos@ETL ~]$ sudo -u elastic cp postgresql-42.2.12.jar /usr/local/logstash-7.6.2/logstash-core/lib/jars

3)在 Logstash 的作业配置存储目录中创建 JOB 配置文件。

使用文本编辑器创建配置文件:

[centos@ETL ~]$ sudo -u elastic gedit /data/logstash/config/pgsql.conf

编写以下配置脚本并保存:

input {
    stdin {
    }
    jdbc {
          # 数据库 JDBC 的 jar 包位置,JDBC 的 jar 包需要自行下载准备
          jdbc_driver_library => "/usr/local/logstash-7.6.2/logstash-core/lib/jars/postgresql-42.2.12.jar"
          # 数据库驱动
          jdbc_driver_class => "org.postgresql.Driver"
          # 数据库连接字符串
          jdbc_connection_string => "jdbc:postgresql://192.168.216.133:5432/pgsql"
          # 数据库用户名密码
          jdbc_user => "postgres"
          jdbc_password => "postgres"
          # 数据库重连尝试次数
          connection_retry_attempts => "3"
          # 判断数据库连接是否可用,默认false不开启
          jdbc_validate_connection => "true"
          # 数据库连接可用校验超时时间,默认3600S
          jdbc_validation_timeout => "3600"
          # 开启分页查询(默认false不开启)
          jdbc_paging_enabled => "true"
          # 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值)
          jdbc_page_size => "5000"
          # sql脚本文件或脚本
          # statement_filepath => "pgsql.sql"
          statement => "select row_number() over(order by last_modify_dt  asc) as rn,* from table where last_modify_dt >= :sql_last_value"
          # 是否将列名转为小写,默认为true
          lowercase_column_names => "false"
          # 定期执行周期,"*"从左到右依次代表: 分 时 天 月 年  
          schedule => "* * * * *"
          # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
          record_last_run => "true"
          # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
          use_column_value => "true"
          # 如果 use_column_value 为真,需配置数据表增量字段的类型,只能是"numeric"或"timestamp"
          tracking_column_type => "timestamp"
              # 如果 use_column_value 为真,需配置数据表增量字段名,该字段必须是递增的. 一般序号或时间戳
          tracking_column => "last_modify_dt"
          last_run_metadata_path => "/data/logstash/config/sql_last_value"
    }
}
filter {
    mutate {
        # 将 keyword_tag 字段的内容通过 "," 分割成数组
        split => { "keyword_tag" => "," }
    }
    mutate {
        # 将 taxonomy 字段的内容中的前缀"/"和"//"替换为""。
        gsub => ["taxonomy", "^/(/)?", ""]
    }
}
output {
    elasticsearch {
        # Elasticsearch 服务IP和端口,账号和密码
        hosts => ["192.168.216.128:9200", "192.168.216.129:9200", "192.168.216.130:9200"]
        user => "elastic"
        password => "elastic"
        # 索引名
        index => "pgsql_index"
        # 需要关联的数据表中有一个uuid字段,对应索引文档的id号
        document_id => "%{uuid}"
    }
    stdout {
        codec => rubydebug
    }
}

4)验证 JOB 配置文件。

[centos@ETL ~]$ sudo -u elastic /usr/local/logstash-7.6.2/bin/logstash -f /data/logstash/config/pgsql.conf -t

其他配置请参加官方文档:

注意:每个作业编写独立的配置脚本。

8、配置 Logstash 开机自启动。

使用文本编辑器创建配置文件:

[centos@ETL ~ ]$ sudo gedit /usr/lib/systemd/system/logstash.service

编写文件内容并保存如下:

[Unit]
Description=Logstash
After=syslog.target network.target

[Service]
User=elastic
Group=elastic

ExecStart=/usr/local/logstash-7.6.2/bin/logstash
ExecStop=/bin/kill -HUP $MAINPID
ExecReload=/bin/kill -HUP $MAINPID

KillMode=mixed
KillSignal=SIGINT

LimitNOFILE=100000
LimitNPROC=100000

[Install]
WantedBy=multi-user.target

设置开机启动:

[centos@ETL ~]$ sudo systemctl daemon-reload
[centos@ETL ~]$ sudo systemctl enable logstash.service

9、启动 Logstash 服务。

[centos@ETL ~]$ sudo systemctl start logstash.service

10、Logstash 运维管理

1)启动 Logstash 服务(任选一种方式)

[centos@ETL ~]$ sudo systemctl start logstash.service

或者

[centos@ETL ~]$ sudo -u elastic /usr/local/logstash-7.6.2/bin/logstash

2)停止 Logstash 服务(任选一种方式)

[centos@ETL ~]$ sudo systemctl stop logstash.service

或者

[centos@ETL ~]$ sudo ps -ef | grep logstash
elastic   26393  26391  5 11:01 ?        00:01:38 /bin/java...
[centos@ETL ~]$ sudo kill -9 26393

3)重启 Logstash 服务

[centos@ETL ~]$ sudo systemctl restart logstash.service

或者

[centos@ETL ~]$ sudo ps -ef | grep logstash
elastic   26393  26391  5 11:01 ?        00:01:38 /bin/java...
[centos@ETL ~]$ sudo kill -9 26393
[centos@ETL ~]$ sudo -u elastic /usr/local/logstash-7.6.2/bin/logstash

4)查看 Logstash 服务状态

[centos@ETL ~]$ sudo systemctl status logstash.service

或者

[centos@ETL ~]$ sudo -u elastic tail /data/logstash/logs/logstash-plain.log

5)启动 Logstash 服务开机自启动

[centos@ETL ~]$ sudo systemctl enable logstash.service

6)禁用 Logstash 服务开机自启动

[centos@ETL ~]$ sudo systemctl disable logstash.service

7.Java 开发集成

7.1.ES 编程语言


7.2.ES + Jest 可复用类库

JestClient 是 Elasticsearch 的 JavaAPI 包,以下是 Maven 项目的程序案例。

1、从 Maven 库中引入 JestClient 包。

<dependency>
    <groupId>io.searchbox</groupId>
    <artifactId>jest-common</artifactId>
    <version>6.3.1</version>
</dependency>

2、编写 Elasticsearch 会话工厂类 "SearchSessionFacade.java"。

/**
 * 
 * ElasticSearch Libs
 * 
 * ©2020 张毅
 * 
 */

import com.google.gson.GsonBuilder;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;

/**
 * ElasticSearch 会话工厂类 (ElasticSearch Session Factory)
 */
public class SearchSessionFactory {

    /** 请求服务地址 */
    private String[] serverUris;

    /** 是否启用多线程 */
    private boolean multiThreaded = true;

    /** 连接超时时长 */
    private int connTimeout = 10000;

    /** 数据读取超时时长 */
    private int readTimeout = 10000;

    /** 日期格式 */
    private String dateFormat = "yyyy-MM-dd HH:mm:ss";

    /** 访问账号 */
    private String user;

    /** 访问密码 */
    private String pass;

    /** Jest客户端 */
    private JestClient jestClient;

    /**
     * 获取请求服务地址。
     * 
     * @return 请求服务地址。
     */
    public String[] getServerUris() {
        return serverUris;
    }

    /**
     * 设置请求服务地址。
     * 
     * @param serverUris 请求服务地址。
     */
    public void setServerUris(String[] serverUris) {
        this.serverUris = serverUris;
    }

    /**
     * 获取是否启用多线程。
     * 
     * @return 启用多线程返回true,否则返回false。
     */
    public boolean isMultiThreaded() {
        return multiThreaded;
    }

    /**
     * 设置是否启用多线程。
     * 
     * @param multiThreaded 设置为true启用,false停用。
     */
    public void setMultiThreaded(boolean multiThreaded) {
        this.multiThreaded = multiThreaded;
    }

    /**
     * 获取连接超时时长(毫秒)。
     * 
     * @return 连接超时时长(毫秒)。
     */
    public int getConnTimeout() {
        return connTimeout;
    }

    /**
     * 设置连接超时时长(毫秒)。
     * 
     * @param connTimeout 连接超时时长(毫秒)。
     */
    public void setConnTimeout(int connTimeout) {
        this.connTimeout = connTimeout;
    }

    /**
     * 获取数据读取超时时长(毫秒)。
     * 
     * @return 数据读取超时时长(毫秒)。
     */
    public int getReadTimeout() {
        return readTimeout;
    }

    /**
     * 设置数据读取超时时长(毫秒)。
     * 
     * @param readTimeout 数据读取超时时长(毫秒)。
     */
    public void setReadTimeout(int readTimeout) {
        this.readTimeout = readTimeout;
    }

    /**
     * 获取日期格式。
     * 
     * @return 日期格式。
     */
    public String getDateFormat() {
        return dateFormat;
    }

    /**
     * 设置日期格式。
     * 
     * @param dateFormat 日期格式。
     */
    public void setDateFormat(String dateFormat) {
        this.dateFormat = dateFormat;
    }

    /**
     * 获取访问账号。
     * 
     * @return 访问账号。
     */
    public String getUser() {
        return user;
    }

    /**
     * 设置访问账号。
     * 
     * @param user 访问账号。
     */
    public void setUser(String user) {
        this.user = user;
    }

    /**
     * 获取访问密码。
     * 
     * @return 访问密码。
     */
    public String getPass() {
        return pass;
    }

    /**
     * 设置访问访问密码。
     * 
     * @param pass 访问密码。
     */
    public void setPass(String pass) {
        this.pass = pass;
    }

    /**
     * 获取Jest连接客户端。
     * 
     * @return Jest客户端。
     */
    public JestClient getJestClient() {

        if (jestClient == null) {
            jestClient = this.createJestClient();
        }
        return jestClient;
    }

    /**
     * 创建Jest客户端实例。
     * 
     * @return Jest客户端实例。
     */
    protected JestClient createJestClient() {

        JestClientFactory factory = new JestClientFactory();
        if (this.getUser() != null && !this.getUser().equals("") && this.getPass() != null
                && !this.getPass().equals("")) {
            factory.setHttpClientConfig(new HttpClientConfig.Builder(serverUris).multiThreaded(multiThreaded)
                    .connTimeout(connTimeout).readTimeout(readTimeout)
                    .gson(new GsonBuilder().setDateFormat(dateFormat).create()).defaultCredentials(user, pass).build());
            this.jestClient = factory.getObject();
        } else {
            factory.setHttpClientConfig(new HttpClientConfig.Builder(serverUris).multiThreaded(multiThreaded)
                    .connTimeout(connTimeout).readTimeout(readTimeout)
                    .gson(new GsonBuilder().setDateFormat(dateFormat).create()).build());
            this.jestClient = factory.getObject();
        }

        return this.jestClient;
    }
}

3、编写 Elasticsearch 会话类 "SearchSession.java"。

/**
 * 
 * ElasticSearch Libs
 * 
 * ©2020 张毅
 * 
 */

import java.util.Collection;
import java.util.List;
import java.util.Map;

import com.google.gson.JsonObject;

import io.searchbox.action.BulkableAction;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.cluster.GetSettings;
import io.searchbox.cluster.Health;
import io.searchbox.cluster.NodesInfo;
import io.searchbox.cluster.NodesStats;
import io.searchbox.core.Bulk;
import io.searchbox.core.Count;
import io.searchbox.core.CountResult;
import io.searchbox.core.Delete;
import io.searchbox.core.DeleteByQuery;
import io.searchbox.core.Get;
import io.searchbox.core.Index;
import io.searchbox.core.MultiGet;
import io.searchbox.core.Search;
import io.searchbox.core.Search.Builder;
import io.searchbox.core.SearchResult;
import io.searchbox.core.SearchScroll;
import io.searchbox.core.Update;
import io.searchbox.core.UpdateByQuery;
import io.searchbox.indices.ClearCache;
import io.searchbox.indices.CloseIndex;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import io.searchbox.indices.Flush;
import io.searchbox.indices.IndicesExists;
import io.searchbox.indices.OpenIndex;
import io.searchbox.indices.Optimize;
import io.searchbox.indices.aliases.AddAliasMapping;
import io.searchbox.indices.aliases.AliasExists;
import io.searchbox.indices.aliases.GetAliases;
import io.searchbox.indices.aliases.ModifyAliases;
import io.searchbox.indices.aliases.RemoveAliasMapping;
import io.searchbox.indices.mapping.DeleteMapping;
import io.searchbox.indices.mapping.GetMapping;
import io.searchbox.indices.mapping.PutMapping;
import io.searchbox.indices.settings.UpdateSettings;
import io.searchbox.indices.template.DeleteTemplate;
import io.searchbox.indices.template.GetTemplate;
import io.searchbox.indices.template.PutTemplate;
import io.searchbox.params.Parameters;

/**
 * ElasticSearch 会话类 (ElasticSearch Session)
 */
public class SearchSession {

    /** Jest客户端 */
    protected final JestClient jestClient;

    /**
     * 获取Jest连接客户端。
     * 
     * @return Jest客户端。
     */
    protected JestClient getJestClient() {

        return jestClient;
    }

    /**
     * 构造器。
     * 
     * @param factory ElasticSearch创建工厂。
     */
    public SearchSession(SearchSessionFactory factory) {

        this.jestClient = factory.getJestClient();
    }

    /**
     * 执行批量事务。
     * 
     * @param actions 事务动作集合。
     * @return 执行结果模型。
     */
    @SuppressWarnings("rawtypes")
    public JestResult executeBulk(Collection<? extends BulkableAction> actions) {

        try {
            Bulk bulk = new Bulk.Builder().addAction(actions).build();
            JestResult jestResult = jestClient.execute(bulk);
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 优化。
     * 
     * @return 执行结果模型。
     */
    public JestResult optimize() {

        try {
            JestResult jestResult = jestClient.execute(new Optimize.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 刷新。
     * 
     * @return 执行结果模型。
     */
    public JestResult flush() {

        try {
            JestResult jestResult = jestClient.execute(new Flush.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 清除缓存。
     * 
     * @return 执行结果模型。
     */
    public JestResult clearCache() {

        try {
            JestResult jestResult = jestClient.execute(new ClearCache.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取集群健康信息。
     * 
     * @return 执行结果模型。
     */
    public JestResult getClusterHealth() {

        try {
            JestResult jestResult = jestClient.execute(new Health.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取节点信息。
     * 
     * @return 执行结果模型。
     */
    public JestResult getNodesInfo() {

        try {
            JestResult jestResult = jestClient.execute(new NodesInfo.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取节点状态信息。
     * 
     * @return 执行结果模型。
     */
    public JestResult getNodesStats() {

        try {
            JestResult jestResult = jestClient.execute(new NodesStats.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 更新配置。
     * 
     * @param setting 配置更新脚本JSON字符串。。
     * @return 执行结果模型。
     */
    public JestResult updateSettings(String setting) {

        try {
            JestResult jestResult = jestClient.execute(new UpdateSettings.Builder(setting).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 更新配置。
     * 
     * @param setting 配置更新脚本JSON模型。
     * @return 执行结果模型。
     */
    public JestResult updateSettings(JsonObject setting) {

        return this.updateSettings(setting.toString());
    }

    /**
     * 获取配置。
     * 
     * @return 执行结果模型。
     */
    public JestResult geSettings() {
        try {
            JestResult jestResult = jestClient.execute(new GetSettings.Builder().build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 创建索引。
     * 
     * @param index 索引名字。
     * @return 执行结果模型。
     */
    public JestResult createIndex(String index) {

        try {
            if (this.indicesExists(index).isSucceeded()) {
                this.deleteIndex(index);
            }
            JestResult jestResult = jestClient.execute(new CreateIndex.Builder(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 删除索引。
     * 
     * @param index 索引名字。
     * @return 执行结果模型。
     */
    public JestResult deleteIndex(String index) {

        try {
            JestResult jestResult = jestClient.execute(new DeleteIndex.Builder(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 判断索引是否存在。
     * 
     * @param index 索引名字。
     * @return 执行结果模型。
     */
    public JestResult indicesExists(String index) {
        try {
            JestResult jestResult = jestClient.execute(new IndicesExists.Builder(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 关闭索引。
     * 
     * @param index 索引名字。
     * @return 执行结果模型。
     */
    public JestResult closeIndex(String index) {

        try {
            JestResult jestResult = jestClient.execute(new CloseIndex.Builder(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 打开索引。
     * 
     * @param index 索引名字。
     * @return 执行结果模型。
     */
    public JestResult openIndex(String index) {

        try {
            JestResult jestResult = jestClient.execute(new OpenIndex.Builder(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 创建映射。
     * 
     * @param index   索引名字。
     * @param type    索引类型。
     * @param mapping 映射创建脚本JSON字符串。
     * @return 执行结果模型。
     */
    public JestResult putMappting(String index, String type, String mapping) {

        try {
            JestResult jestResult = jestClient.execute(new PutMapping.Builder(index, type, mapping).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 创建映射。
     * 
     * @param index   索引名字。
     * @param type    索引类型。
     * @param mapping 映射创建脚本JSON模型。
     * @return 执行结果模型。
     */
    public JestResult putMappting(String index, String type, JsonObject mapping) {

        return this.putMappting(index, type, mapping.toString());
    }

    /**
     * 删除映射。
     * 
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult deleteMappting(String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new DeleteMapping.Builder(index, type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取映射。
     * 
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult getMappting(String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new GetMapping.Builder().addIndex(index).addType(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 创建别名。
     * 
     * @param indexes 索引名字列表。
     * @param alias   别名。
     * @return 执行结果模型。
     */
    public JestResult addAliasMapping(List<String> indexes, String alias) {

        try {
            AddAliasMapping build = new AddAliasMapping.Builder(indexes, alias).build();
            JestResult jestResult = jestClient.execute(new ModifyAliases.Builder(build).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 删除别名。
     * 
     * @param indexes 索引名字列表。
     * @param alias   别名。
     * @return 执行结果模型。
     */
    public JestResult deleteAliasMapping(List<String> indexes, String alias) {

        try {
            RemoveAliasMapping build = new RemoveAliasMapping.Builder(indexes, alias).build();
            JestResult jestResult = jestClient.execute(new ModifyAliases.Builder(build).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 判断别名是否存在。
     * 
     * @param alias 别名。
     * @return 执行结果模型。
     */
    public JestResult aliasExists(String alias) {

        try {
            JestResult jestResult = jestClient.execute(new AliasExists.Builder().alias(alias).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取别名。
     * 
     * @param indexes 索引名字列表。
     * @param alias   别名。
     * @return 执行结果模型。
     */
    public JestResult getAliases(String index) {

        try {
            JestResult jestResult = jestClient.execute(new GetAliases.Builder().addIndex(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 创建模板。
     * 
     * @param name     模板名字。
     * @param template 模板创建脚本JSON字符串。
     * @return 执行结果模型。
     */
    public JestResult putTemplate(String name, String template) {

        try {
            JestResult jestResult = jestClient.execute(new PutTemplate.Builder(name, template).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 创建模板。
     * 
     * @param name     模板名字。
     * @param template 模板创建脚本JSON模型。
     * @return 执行结果模型。
     */
    public JestResult putTemplate(String name, JsonObject template) {

        return this.putTemplate(name, template.toString());
    }

    /**
     * 删除模板。
     * 
     * @param name 模板名字。
     * @return 执行结果模型。
     */
    public JestResult deleteTemplate(String name) {

        try {
            JestResult jestResult = jestClient.execute(new DeleteTemplate.Builder(name).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取模板。
     * 
     * @param name 模板名字。
     * @return 执行结果模型。
     */
    public JestResult getTemplate(String name) {

        try {
            JestResult jestResult = jestClient.execute(new GetTemplate.Builder(name).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 插入文档。
     * 
     * @param doc   文档。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult insertDocuments(List<Map<String, Object>> docs, String index, String type) {

        try {
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index);
            for (Map<String, Object> doc : docs) {
                Index action;
                if (doc.get("id") != null) {
                    action = new Index.Builder(doc).id(doc.get("id").toString()).index(index).type(type).build();
                    bulk.addAction(action);
                } else {
                    action = new Index.Builder(doc).index(index).build();
                    bulk.addAction(action);
                }
            }
            JestResult jestResult = jestClient.execute(bulk.build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 插入文档。
     * 
     * @param doc   文档。
     * @param id    文档ID。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult insertDocument(Map<String, Object> doc, String id, String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new Index.Builder(doc).id(id).index(index).type(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 插入文档。
     * 
     * @param doc   文档。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult insertDocument(Map<String, Object> doc, String index, String type) {

        try {
            if (doc.get("id") != null) {
                JestResult jestResult = jestClient
                        .execute(new Index.Builder(doc).id(doc.get("id").toString()).index(index).type(type).build());
                if (!jestResult.isSucceeded()) {
                    throw new Exception(jestResult.getErrorMessage());
                }
                return jestResult;
            } else {
                JestResult jestResult = jestClient.execute(new Index.Builder(doc).index(index).type(type).build());
                if (!jestResult.isSucceeded()) {
                    throw new Exception(jestResult.getErrorMessage());
                }
                return jestResult;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 更新文档。
     * 
     * @param doc   文档。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult updateDocuments(List<Map<String, Object>> docs, String index, String type) {

        try {
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index);
            for (Map<String, Object> doc : docs) {
                Update action;
                if (doc.get("id") != null) {
                    action = new Update.Builder(doc).id(doc.get("id").toString()).index(index).type(type).build();
                    bulk.addAction(action);
                }
            }
            JestResult jestResult = jestClient.execute(bulk.build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 更新文档。
     * 
     * @param doc   文档。
     * @param id    文档ID。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult updateDocument(Map<String, Object> doc, String id, String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new Update.Builder(doc).id(id).index(index).type(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 更新文档。
     * 
     * @param doc   文档。
     * @param id    文档ID。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult updateDocument(Map<String, Object> doc, String index, String type) {

        try {
            if (doc.get("id") != null) {
                JestResult jestResult = jestClient
                        .execute(new Update.Builder(doc).id(doc.get("id").toString()).index(index).type(type).build());
                if (!jestResult.isSucceeded()) {
                    throw new Exception(jestResult.getErrorMessage());
                }
                return jestResult;
            }
            return null;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 更新文档。
     * 
     * @param query 查询脚本JSON字符串。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult updateDocumentByQuery(String query, String index, String type) {

        try {
            JestResult jestResult = jestClient
                    .execute(new UpdateByQuery.Builder(query).addIndex(index).addType(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
    /**
     * 更新文档。
     * 
     * @param query 查询脚本JSON模型。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult updateDocumentByQuery(JsonObject query, String index, String type) {

        try {
            JestResult jestResult = jestClient
                    .execute(new UpdateByQuery.Builder(query.toString()).addIndex(index).addType(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 删除文档。
     * 
     * @param ids   文档ID。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult deleteDocuments(String[] ids, String index, String type) {

        try {
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
            for (String id : ids) {
                Delete action = new Delete.Builder(id).build();
                bulk.addAction(action);
            }
            JestResult jestResult = jestClient.execute(bulk.build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 删除文档。
     * 
     * @param id    文档ID。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult deleteDocument(String id, String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new Delete.Builder(id).index(index).type(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 删除文档。
     * 
     * @param query 查询脚本。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult deleteDocumentByQuery(String query, String index, String type) {

        try {
            JestResult jestResult = jestClient
                    .execute(new DeleteByQuery.Builder(query).addIndex(index).addType(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 删除文档。
     * 
     * @param query 查询脚本JSON模型。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 执行结果模型。
     */
    public JestResult deleteDocumentByQuery(JsonObject query, String index, String type) {

        try {
            JestResult jestResult = jestClient
                    .execute(new DeleteByQuery.Builder(query.toString()).addIndex(index).addType(index).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取指定ID的文档。
     * 
     * @param ids   文档ID集合。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 指定ID的文档JSON模型集合。
     */
    public List<JsonObject> getDocumentsByIds(List<String> ids, String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new MultiGet.Builder.ById(index, type).addId(ids).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            List<JsonObject> docs = jestResult.getSourceAsObjectList(JsonObject.class);
            return docs;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取指定ID。
     * 
     * @param id    文档ID。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 指定ID的文档JSON模型。
     */
    public JsonObject getDocumentsById(String id, String index, String type) {

        try {
            JestResult jestResult = jestClient.execute(new Get.Builder(index, id).type(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            JsonObject doc = jestResult.getSourceAsObject(JsonObject.class);
            return doc;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取文档数量。
     * 
     * @param search 查询脚本JSON字符串。
     * @param index  索引名字。
     * @param type   索引类型。
     * @return 文档数量。
     */
    public Double getDocumentCount(String query, String index, String type) {

        try {
            CountResult jestResult = jestClient
                    .execute(new Count.Builder().query(query).addIndex(index).addType(type).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult.getCount();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取文档数量。
     * 
     * @param query 查询脚本JSON模型。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 文档数量。
     */
    public Double getDocumentCount(JsonObject query, String index, String type) {

        return this.getDocumentCount(query.toString(), index, type);
    }

    /**
     * 查询文档。
     * 
     * @param query 查询脚本JSON字符串。
     * @param from  页起点。
     * @param size  页长度。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 查询结果模型。
     */
    public SearchResult search(String query, Integer from, Integer size, String index, String type) {

        try {
            Builder builder = new Search.Builder(query).addIndex(index).addType(type)
                    .setParameter(Parameters.SIZE, size != null && size > 0 ? size : 15)
                    .setParameter(Parameters.FROM, from != null && from > 0 ? from : 0);
            SearchResult result = jestClient.execute(builder.build());
            if (!result.isSucceeded()) {
                throw new Exception(result.getErrorMessage());
            }
            return result;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 查询文档。
     * 
     * @param query 查询脚本JSON模型。
     * @param from  页起点。
     * @param size  页长度。
     * @param index 索引名字。
     * @param type  索引类型。
     * @return 查询结果模型。
     */
    public SearchResult search(JsonObject query, Integer from, Integer size, String index, String type) {

        return this.search(query.toString(), from, size, index, type);
    }

    /**
     * 向后滚动查询文档。
     * 
     * @param scrollId 向后滚动ID。
     * @param scroll   有效时间,如:"5m"。
     * @return 查询结果模型。
     */
    public JestResult search(String scrollId, String scroll) {

        try {
            JestResult jestResult = jestClient.execute(new SearchScroll.Builder(scrollId, scroll).build());
            if (!jestResult.isSucceeded()) {
                throw new Exception(jestResult.getErrorMessage());
            }
            return jestResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 向后滚动查询文档。
     * 
     * @param scrollId 向后滚动ID。
     * @return 查询结果模型。
     */
    public JestResult search(String scrollId) {

        return this.search(scrollId, "5m");
    }

    /**
     * 实例回收方法。
     */
    @Override
    protected void finalize() throws Throwable {

        if (this.jestClient != null) {
            this.jestClient.close();
        }
        super.finalize();
    }
}

4、编写主程序。

1)编写 Elasticsearch 连接文件 "elasticsearch.properties" 。

es.serverUri=http://192.168.216.128:9200
es.multiThreaded=true
es.connTimeout=10000
es.readTimeout=10000
es.dateFormat=yyyy-MM-dd HH:mm:ss
es.user=elastic
es.pass=elastic

2)编写主程序文件 "JestApp.java"。

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.ResourceBundle;

public class JestApp {
    
    public static void main( String[] args ) {
        
        // 获取 Elasticsearch 连接文件信息
        ResourceBundle resource = ResourceBundle.getBundle("elasticsearch");
        
        String serverUri = resource.getString("serverUri");
        String multiThreaded = resource.getString("multiThreaded");
        String connTimeout = resource.getString("connTimeout");
        String readTimeout = resource.getString("readTimeout");
        String dateFormat = resource.getString("dateFormat");
        String user = resource.getString("user");
        String pass = resource.getString("pass");
        
        // 定义 Elasticsearch 连接工厂的实例
        SearchSessionFactory factory = new SearchSessionFactory();
        
        factory.setServerUri(serverUri);
        factory.setMultiThreaded(Boolean.valueOf(multiThreaded));
        factory.setConnTimeout(Integer.valueOf(connTimeout));
        factory.setReadTimeout(Integer.valueOf(readTimeout));
        factory.setDateFormat(dateFormat);
        factory.setUser(user);
        factory.setPass(pass);
        
        // 定义  Elasticsearch 连接会话
        SearchSession session = new SearchSession(factory);
        
        // 索引名称
        String index = "pgsql_index";
        
        // 索引文档类型,固定值
        String type = "_doc";
        
        // 文档ID
        String id = "document_id";
        
        // 文档内容,一个空文档。
        Map<String, Object> doc = new LinkedHashMap<String, Object>();
        
        // 创建索引
        session.createIndex(index);
        
        // 删除索引
        session.deleteIndex(index);
        
        // 验证索引是否存在
        session.indicesExists(index);
        
        // 打开索引
        session.openIndex(index);
        
        // 关闭索引
        session.closeIndex(index);
        
        // 插入文档
        session.insertDocument(doc, id, index, type);
        
        // 更新文档
        session.updateDocument(doc, id, index, type);
        
        // 删除文档
        session.deleteDocument(id, index, type);
        
        // 获取指定ID的文档
        session.getDocumentsById(id, index, type);
        
        // 查询全部数据的前10个文档
        String query = "{\"query\": {\"match_all\": {}}}";
        session.search(query, 0, 10, index, type);
    }
}

7.3.ELK + Jest 最佳集成方案

Elasticsearch 作为 OLAP 数据库,它的数据一般都是来源于业务系统的持久化数据库中,如:PostgreSQL、MySQL、Oracle、SQLServer 等 RDBMS 数据库或 MongoDB 等 NoSQL数据库。基于类似场景,建议通过以下方式来完成 Elasticsearch、Kibana、Logstash、Jest 的集成使用:

1、使用 Kibana 创建和管理 Elasticsearch 模板。

Elasticsearch 支持通过模板来定义索引格式,模板可以应用于索引名称中包含指定字符的索引,它实现在创建索引时,将索引中包含指定字符的属性设置为特定的格式。使用 Kibana 在 Elasticsearch 中一次性创建模板。例如:

PUT _template/pgsql_template
{
  "version": 1,
  "order": 1,
  "index_patterns": [
    "pgsql-*"
  ],
  "settings": {
    "index": {
      "analysis": {
        "analyzer": {
          "ik_smart_analyzer": {
            "tokenizer": "ik_smart"
          }
        }
      },
      "number_of_shards": "3",
      "number_of_replicas": "3",
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "dynamic_templates": [
      {
        "tag_field": {
          "mapping": {
            "norms": false,
            "type": "keyword"
          },
          "match_mapping_type": "string",
          "match": "*_tag"
        }
      },
      {
        "msg_field": {
          "mapping": {
            "anaylzer": "ik_smart_analyzer",
            "norms": false,
            "type": "text"
          },
          "match_mapping_type": "string",
          "match": "*_msg"
        }
      },
      {
        "str_fields": {
          "mapping": {
            "anaylzer": "ik_smart_analyzer",
            "norms": false,
            "type": "text",
            "fields": {
              "keyword": {
                "ignore_above": 256,
                "type": "keyword"
              }
            }
          },
          "match_mapping_type": "string",
          "match": "*"
        }
      }
    ]
  }
}

在上面的模板设置中,它设置了以下内容:

1)"index_patterns" 中定义将索引名称前缀为 "pgsql-" 的索引在创建时应用此模板。

2) "settings" 中定义了索引使用的分词器、分片和刷新时间等参数。

3) "mappings" 中定义了以下规则:

  • 名称以 "_tag" 结尾的属性,其类型为不支持分词的 keyword 数组;
  • 名称以 "_msg" 结尾的属性,其类型为支持中文分词、拼音分词的文本,且对全部分词进行索引;
  • 其他所有属性,其类型为支持中文分词、拼音分词的 text 文本,但只对前256个字符进行索引。

2、使用 Logstash 创建索引和插入、更新文档。

Logstash 可以高效的从数据库层面完成从业务数据库抽取数据,经过简单的过滤处理之后,向 Elasticsearch 中根据文档ID来插入或更新文档,但不能物理删除文档。在插入文档时,如果索引不存在则自动创建索引。

3、使用 Jest 删除索引、文档。

因为 Logstash 不能完成物理删除文档的处理,因此应用在业务数据库中进行物理数据删除操作时,需要使用 Jest 调用 API 同步将这些数据在 Elasticsearch 中删除。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容