一、Logstash简介
Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。
Logstash管道有两个必需的元素,输入和输出,以及一个可选元素过滤器。输入插件从数据源那里消费数据,过滤器插件根据你的期望修改数据,输出插件将数据写入目的地。
输入
:采集各种样式、大小和来源的数据
数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
过滤器
:实时解析和转换数据
数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。
Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响:
利用 Grok 从非结构化数据中派生出结构
从 IP 地址破译出地理坐标
将 PII(个人验证信息) 数据匿名化,完全排除敏感字段
整体处理不受数据源、格式或架构的影响
输出
:选择你的存储,导出你的数据
尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。
Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。
logstash是做数据采集的,类似于flume。
Logstash架构:
Batcher负责批量的从queue中取数据;
Queue分类:
In Memory : 无法处理进程Crash、机器宕机等情况,会导致数据丢失
Persistent Queue In Disk:可处理进程Crash等情况,保证数据不丢失,保证数据至少消费一次,充当缓冲区,可以替代kafka等消息队列的作用。
官网:https://www.elastic.co/cn/products/logstash
二、下载安装
- 下载安装
# 下载
[root@localhost ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.3.tar.gz
# 解压
[root@localhost ~]# tar -zxvf logstash-6.4.3.tar.gz -C /usr/local
# 查看内容
[root@localhost ~]# ls /usr/local/logstash-6.4.3/
# 测试logstash-6.4.3
[root@localhost logstash-6.4.3]# ./bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
Sending Logstash logs to /usr/local/logstash-6.4.3/logs which is now configured via log4j2.properties
[2019-09-25T15:12:41,903][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-09-25T15:12:42,613][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.4.3"}
[2019-09-25T15:12:45,490][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2019-09-25T15:12:45,845][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x310540d4 run>"}
The stdin plugin is now waiting for input:
[2019-09-25T15:12:46,543][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-09-25T15:12:47,020][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
如果启动成功会出现一下的提示语句:
接着屏幕就等着你输入了,比如输入一个Hello World,会出现以下的提示语句。
HelloWorld
{
"@timestamp" => 2019-09-25T07:14:40.491Z,
"host" => "localhost",
"@version" => "1",
"message" => "HelloWorld"
}
- 配置文件简单测试
pipeline配置简介:
Pipeline用于配置input、filter和output插件
input {
...
}
filter {
...
}
output {
...
}
创建配置文件logstash.conf:
[root@localhost logstash-6.4.3]# vi config/logstash.conf
# 输入内容
input {
stdin { }
}
output {
stdout {
codec => rubydebug { }
}
elasticsearch {
hosts => ["0.0.0.0:9200"]
user => elastic
password => xW9dqAxThD5U4ShQV1JT
}
}
# 启动elasticsearch
# 指定配置文件启动
[root@localhost logstash-6.4.3]# ./bin/logstash -f config/logstash.conf
# 同样命令行等着你输入指令
Hello World
{
"@version" => "1",
"host" => "localhost",
"@timestamp" => 2019-09-25T07:25:03.292Z,
"message" => "Hello World"
}
访问:
http://192.168.77.132:9200/_search?q=Hello
常见问题:
- 错误提示:Unrecognized VM option 'UseParNewGC'
JDK版本不正确。 - (LoadError) Unsupported platform: x86_64-linux
切换JDK为1.8版本
# 安装JDK
[root@localhost ~]# yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
# 切换JDK
[root@localhost logstash-6.4.3]# alternatives --config java
共有 2 个提供“java”的程序。
选项 命令
-----------------------------------------------
+ 1 java-11-openjdk.x86_64 (/usr/lib/jvm/java-11-openjdk-11.0.4.11-1.el7_7.x86_64/bin/java)
* 2 java-1.8.0-openjdk.x86_64 (/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-1.el7_7.x86_64/jre/bin/java)
按 Enter 保留当前选项[+],或者键入选项编号:2
- error=>"Got response code '401' contacting Elasticsearch at URL 'http://0.0.0.0:9200/'"
原因:
没有指定Elasticsearch 权限,修改配置添加elasticsearch 的用户与密码:
output {
stdout {
codec => rubydebug { }
}
elasticsearch {
hosts => ["0.0.0.0:9200"]
user => elastic
password => xW9dqAxThD5U4ShQV1JT
}
}
三、采集MySql数据库数据
- 下载MySql-JDBC
MySql-JDBC下载地址:https://dev.mysql.com/downloads/connector/j/5.1.html
5.1下载:
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
解压:
[root@localhost ~]# tar -zxvf mysql-connector-java-5.1.48.tar.gz
8.0下载地址:
https://dev.mysql.com/downloads/connector/j/8.0.html
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-8.0.17.tar.gz
- 安装MySql-JDBC插件
# 安装插件
[root@localhost logstash-6.4.3]# ./bin/logstash-plugin install logstash-input-jdbc
Validating logstash-input-jdbc
Installing logstash-input-jdbc
Installation successful
-
创建数据库、表、与sql文件
数据库:log_test 表:news
数据脚本:
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for news
-- ----------------------------
DROP TABLE IF EXISTS `news`;
CREATE TABLE `news` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '标题',
`content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '内容',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
创建执行SQL:
# 创建目录
[root@localhost logstash-6.4.3]# mkdir sql
# 添加内容
[root@localhost sql]# vi sql/jdbc.sql
# 内容为
select * from news
要注意:这里的内容就是logstash依赖执行的sql命令,所以这里的表名要跟你实际的数据库表名一致,否则会失败。
至于jdbc.sql的内容就是你的业务sql,只能有一条,且末尾不要加分号,否则出错!
- 单数据源同步
单数据源同步是指,数据只写入一个index下(注意:6.x版本下,一个index下只能有一个type),jdbc块和elasticsearch块也是 一 一 对应的关系,具体看下同步conf的配置(示例配置文件名称: singledb.conf):
# 创建配置文件
[root@localhost logstash-6.4.3]# vi config/singledb.conf
# 内容
input {
stdin {
}
jdbc {
# mysql jdbc connection 连接地址
jdbc_connection_string => "jdbc:mysql://10.2.129.166:3306/log_test?serverTimezone=Asia/Shanghai&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
# 登录数据库的用户名、密码
jdbc_user => "root"
jdbc_password => "1234"
# jdbc 驱动包路径
jdbc_driver_library => "/usr/local/logstash-6.4.3/config/mysql-connector-java-8.0.17.jar" # 连接驱动类名
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "/usr/local/logstash-6.4.3/config/jdbc.sql"
# 以下表示定时执行任务,使用cron表达式,本文只全量同步一次,所以不配置定时器,如果要实现增量更新,需要配合定时器以及上次查询的最后一个值,具体要根据你的业务来定。
# schedule => "* * * * *"
type => "jdbc"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => ["192.168.77.132:9200"]
index => "mynews"
document_id => "%{id}"
user => elastic
password => xW9dqAxThD5U4ShQV1JT
}
stdout {
codec => json_lines
}
}
# 复制jdbc jar 文件
[root@localhost logstash-6.4.3]# cp /root/mysql-connector-java-8.0.17/mysql-connector-java-8.0.17.jar config
# 启动同步
[root@localhost logstash-6.4.3]# ./bin/logstash -f config/singledb.conf
注意:配置文件中的jar与sql一定要使用绝对路径。
启动同步,同步的内容会输出到屏幕上。执行完成后可在es中查看数据。
- 多数据源同步
多数据源同步是指,需要同步多种类型的数据到es中,input的配置添加相应的jdbc模块,output中根据type类型判断添加对应的elasticsearch模块:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.91.149:3306/test"
jdbc_user => "root"
jdbc_password => "1234"
# 此处的路径最好是绝对路径,相对路径取决与允许命令的目录
jdbc_driver_library => "/usr/local/logstash-6.4.3/config/mysql-connector-java-8.0.17.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 此处的路径最好是绝对路径,相对路径取决与允许命令的目录
statement_filepath => "/usr/local/logstash-6.4.3/config/jdbc0.sql"
type => "user"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.91.149:3306/test"
jdbc_user => "root"
jdbc_password => "1234"
# 此处的路径最好是绝对路径,相对路径取决与允许命令的目录
jdbc_driver_library => "/usr/local/logstash-6.4.3/config/mysql-connector-java-8.0.17.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 此处的路径最好是绝对路径,相对路径取决与允许命令的目录
statement_filepath => "/usr/local/logstash-6.4.3/config/jdbc1.sql"
type => "user_address"
}
}
output {
#设置窗口日志输出
stdout {
codec => json_lines
}
# 与JDBC定义的type对应
if[type] == "user" {
elasticsearch {
hosts => ["192.168.77.132:9200"]
# 注意index的值不支持大写字母
index => "user"
# document_type自行设置,不设置时,默认为doc
# document_type => ""
# 此处的值来自查询sql中的列名称,根据需要自行配置
document_id => "%{id}"
user => elastic
password => xW9dqAxThD5U4ShQV1JT
}
}
if[type] == "user_news" {
elasticsearch {
hosts => ["192.168.77.132:9200"]
# 注意index的值不支持大写字母
index => "user_news"
# document_type自行设置,不设置时,默认为doc
# document_type => ""
# 此处的值来自查询sql中的列名称,根据需要自行配置
document_id => "%{id}"
user => elastic
password => xW9dqAxThD5U4ShQV1JT
}
}
}
全量同步
以上的单数据源/多数据源同步都是全量同步,即没有任何条件地进行同步。增量同步
增量同步需要在jdbc模块添加相应的增量配置
配置参数
Schedule:是cron格式的同步周期,其它几个都是用来记录同步增量指标的,
Tracking_column:是数据库中的增量指标字段名
Tracking_columu_type:目前只支持两种numeric,timestamp,
Last_run_metadata_path:是保存上次同步的增量指标值。
而 :sql_last_value如果input里面use_column_value => true, 即如果设置为true的话,可以是我们设定的字段的上一次的值。
默认 use_column_value => false, 这样 :sql_last_value为上一次更新的最后时刻值。
也就是说,对于新增的值,才会更新。这样就实现了增量更新的目的。
四、相关配置
- 持久队列基本配置(pipelines.yml)
queue.type:persisted # 默认是memory
queue.max_bytes:4gb # 队列存储最大数据量
- 线程相关配置(logstash.yml)
pipeline.worksers | -w
# pipeline线程数,即filter_output的处理线程数,默认是cpu核数
pipeline.batch.size | -b
# Batcher一次批量获取的待处理文档数,默认是125,可以根据输出进行调整,越大会占用越多的heap空间,可以通过jvm.options调整
pipeline.batch.delay | -u
# Batcher等待的时长,单位为ms
- Logstash配置文件:
logstash设置相关的配置文件(在conf文件夹中)
logstash.yml:logstash相关配置,比如node.name、path.data、pipeline.workers、queue.type等,这其中的配置可以被命令行参数中的相关参数覆盖
jvm.options:修改jvm的相关参数,比如修改heap size等
pipeline配置文件:定义数据处理流程的文件,以.conf结尾
logstash.yml配置项:
node.name: 节点名称,便于识别
path.data: 持久化存储数据的文件夹,默认是logstash home目录下的data
path.config: 设定pipeline配置文件的目录(如果指定文件夹,会默认把文件夹下的所有.conf文件按照字母顺序拼接为一个文件)
path.log: 设定pipeline日志文件的目录
pipeline.workers: 设定pipeline的线程数(filter+output),优化的常用项
pipeline.batch.size/delay: 设定批量处理数据的数据和延迟
queue.type: 设定队列类型,默认是memory
queue.max_bytes: 队列总容量,默认是1g
五、常见问题
- Error: Java::JavaSql::SQLException: null, message from server: "Host 'DESKTOP-T92R1EE' is not allowed to connect to this MySQL server"
数据库没有开启远程连接
select user,host from user;
# 在Mysql数据库中执行以下sql
update user set host='%' where user ='root';
flush privileges;
最好重启下Mysql