数据采集--Logstash(七)

一、Logstash简介

  Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。


Logstash

  Logstash管道有两个必需的元素,输入和输出,以及一个可选元素过滤器。输入插件从数据源那里消费数据,过滤器插件根据你的期望修改数据,输出插件将数据写入目的地。


Logstash管道

输入:采集各种样式、大小和来源的数据
数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

输入

过滤器:实时解析和转换数据
数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响:

利用 Grok 从非结构化数据中派生出结构
从 IP 地址破译出地理坐标
将 PII(个人验证信息) 数据匿名化,完全排除敏感字段
整体处理不受数据源、格式或架构的影响

输出:选择你的存储,导出你的数据
尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。
Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。

logstash是做数据采集的,类似于flume。
Logstash架构:


Logstash架构图

Batcher负责批量的从queue中取数据;
Queue分类:
In Memory : 无法处理进程Crash、机器宕机等情况,会导致数据丢失
Persistent Queue In Disk:可处理进程Crash等情况,保证数据不丢失,保证数据至少消费一次,充当缓冲区,可以替代kafka等消息队列的作用。

官网:https://www.elastic.co/cn/products/logstash

二、下载安装

  1. 下载安装
# 下载
[root@localhost ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.3.tar.gz
# 解压
[root@localhost ~]# tar -zxvf logstash-6.4.3.tar.gz -C /usr/local
# 查看内容
[root@localhost ~]# ls /usr/local/logstash-6.4.3/
# 测试logstash-6.4.3
[root@localhost logstash-6.4.3]# ./bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
Sending Logstash logs to /usr/local/logstash-6.4.3/logs which is now configured via log4j2.properties
[2019-09-25T15:12:41,903][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-09-25T15:12:42,613][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.4.3"}
[2019-09-25T15:12:45,490][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2019-09-25T15:12:45,845][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x310540d4 run>"}
The stdin plugin is now waiting for input:
[2019-09-25T15:12:46,543][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-09-25T15:12:47,020][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

如果启动成功会出现一下的提示语句:


启动成功

接着屏幕就等着你输入了,比如输入一个Hello World,会出现以下的提示语句。

HelloWorld
{
    "@timestamp" => 2019-09-25T07:14:40.491Z,
          "host" => "localhost",
      "@version" => "1",
       "message" => "HelloWorld"
}

  1. 配置文件简单测试
    pipeline配置简介:
    Pipeline用于配置input、filter和output插件
input {
        ...
}
filter {
        ...
}
output {
        ...
}

创建配置文件logstash.conf:

[root@localhost logstash-6.4.3]# vi config/logstash.conf
# 输入内容

input {
    stdin { }
}

output {
    stdout {
        codec => rubydebug { }
    }
    elasticsearch {
        hosts => ["0.0.0.0:9200"]
        user => elastic
        password => xW9dqAxThD5U4ShQV1JT
    }
}

# 启动elasticsearch
# 指定配置文件启动
[root@localhost logstash-6.4.3]# ./bin/logstash -f config/logstash.conf
# 同样命令行等着你输入指令
Hello World
{
      "@version" => "1",
          "host" => "localhost",
    "@timestamp" => 2019-09-25T07:25:03.292Z,
       "message" => "Hello World"
}

访问:
http://192.168.77.132:9200/_search?q=Hello

测试访问

常见问题:

  1. 错误提示:Unrecognized VM option 'UseParNewGC'
    JDK版本不正确。
  2. (LoadError) Unsupported platform: x86_64-linux
    切换JDK为1.8版本
# 安装JDK
[root@localhost ~]# yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
# 切换JDK
[root@localhost logstash-6.4.3]# alternatives --config java  

共有 2 个提供“java”的程序。

  选项    命令
-----------------------------------------------
 + 1           java-11-openjdk.x86_64 (/usr/lib/jvm/java-11-openjdk-11.0.4.11-1.el7_7.x86_64/bin/java)
*  2           java-1.8.0-openjdk.x86_64 (/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-1.el7_7.x86_64/jre/bin/java)

按 Enter 保留当前选项[+],或者键入选项编号:2

  1. error=>"Got response code '401' contacting Elasticsearch at URL 'http://0.0.0.0:9200/'"
    原因:
    没有指定Elasticsearch 权限,修改配置添加elasticsearch 的用户与密码:
output {
    stdout {
        codec => rubydebug { }
    }
    elasticsearch {
        hosts => ["0.0.0.0:9200"]
        user => elastic
        password => xW9dqAxThD5U4ShQV1JT
    }
}
  1. YUM安装
    参考档安装:
    https://www.elastic.co/guide/en/logstash/6.4/installing-logstash.html#_yum

三、采集MySql数据库数据

  1. 下载MySql-JDBC
    MySql-JDBC下载地址:https://dev.mysql.com/downloads/connector/j/5.1.html
    MySql-JDBC下载地址

    5.1下载:
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
 解压:
[root@localhost ~]# tar -zxvf mysql-connector-java-5.1.48.tar.gz

8.0下载地址:
https://dev.mysql.com/downloads/connector/j/8.0.html

8.0下载地址

[root@localhost ~]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-8.0.17.tar.gz

  1. 安装MySql-JDBC插件
#   安装插件
[root@localhost logstash-6.4.3]# ./bin/logstash-plugin install logstash-input-jdbc
Validating logstash-input-jdbc
Installing logstash-input-jdbc
Installation successful

  1. 创建数据库、表、与sql文件
    数据库:log_test 表:news


    数据库与表

    数据脚本:

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for news
-- ----------------------------
DROP TABLE IF EXISTS `news`;
CREATE TABLE `news`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '标题',
  `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '内容',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

创建执行SQL:

# 创建目录
[root@localhost logstash-6.4.3]# mkdir sql
# 添加内容
[root@localhost sql]# vi sql/jdbc.sql
# 内容为
select * from news

要注意:这里的内容就是logstash依赖执行的sql命令,所以这里的表名要跟你实际的数据库表名一致,否则会失败。
至于jdbc.sql的内容就是你的业务sql,只能有一条,且末尾不要加分号,否则出错!

  1. 单数据源同步
    单数据源同步是指,数据只写入一个index下(注意:6.x版本下,一个index下只能有一个type),jdbc块和elasticsearch块也是 一 一 对应的关系,具体看下同步conf的配置(示例配置文件名称: singledb.conf):
# 创建配置文件
[root@localhost logstash-6.4.3]# vi config/singledb.conf
# 内容
input {
    stdin {
    }
    jdbc {
      # mysql jdbc connection 连接地址
      jdbc_connection_string => "jdbc:mysql://10.2.129.166:3306/log_test?serverTimezone=Asia/Shanghai&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
      # 登录数据库的用户名、密码
      jdbc_user => "root"
      jdbc_password => "1234"
      # jdbc 驱动包路径
      jdbc_driver_library => "/usr/local/logstash-6.4.3/config/mysql-connector-java-8.0.17.jar"      # 连接驱动类名
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      statement_filepath => "/usr/local/logstash-6.4.3/config/jdbc.sql"
      # 以下表示定时执行任务,使用cron表达式,本文只全量同步一次,所以不配置定时器,如果要实现增量更新,需要配合定时器以及上次查询的最后一个值,具体要根据你的业务来定。
      # schedule => "* * * * *"
      type => "jdbc"
    }
}
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
output {
    elasticsearch {
        hosts => ["192.168.77.132:9200"]
        index => "mynews"
        document_id => "%{id}"
        user => elastic
        password => xW9dqAxThD5U4ShQV1JT
    }
    stdout {
        codec => json_lines
    }
}

#  复制jdbc jar  文件 
[root@localhost logstash-6.4.3]# cp /root/mysql-connector-java-8.0.17/mysql-connector-java-8.0.17.jar config

# 启动同步
[root@localhost logstash-6.4.3]# ./bin/logstash -f config/singledb.conf

注意:配置文件中的jar与sql一定要使用绝对路径。
启动同步,同步的内容会输出到屏幕上。执行完成后可在es中查看数据。

同步结果

kibana中查看

结果
  1. 多数据源同步
    多数据源同步是指,需要同步多种类型的数据到es中,input的配置添加相应的jdbc模块,output中根据type类型判断添加对应的elasticsearch模块:
input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://192.168.91.149:3306/test"
        jdbc_user => "root"
        jdbc_password => "1234"
        # 此处的路径最好是绝对路径,相对路径取决与允许命令的目录
        jdbc_driver_library => "/usr/local/logstash-6.4.3/config/mysql-connector-java-8.0.17.jar"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        # 此处的路径最好是绝对路径,相对路径取决与允许命令的目录
        statement_filepath =>  "/usr/local/logstash-6.4.3/config/jdbc0.sql"
        type => "user"
    }
    jdbc {
        jdbc_connection_string => "jdbc:mysql://192.168.91.149:3306/test"
        jdbc_user => "root"
        jdbc_password => "1234"
        # 此处的路径最好是绝对路径,相对路径取决与允许命令的目录
        jdbc_driver_library => "/usr/local/logstash-6.4.3/config/mysql-connector-java-8.0.17.jar"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        # 此处的路径最好是绝对路径,相对路径取决与允许命令的目录
        statement_filepath =>  "/usr/local/logstash-6.4.3/config/jdbc1.sql"
        type => "user_address"
    }
}
output {
    #设置窗口日志输出
    stdout {
        codec => json_lines
    }
       # 与JDBC定义的type对应
    if[type] == "user" {
        elasticsearch {
            hosts => ["192.168.77.132:9200"]
            # 注意index的值不支持大写字母
            index => "user"
            # document_type自行设置,不设置时,默认为doc
            # document_type => ""
            # 此处的值来自查询sql中的列名称,根据需要自行配置
            document_id => "%{id}"
                       user => elastic
                       password => xW9dqAxThD5U4ShQV1JT
        }
    }
    if[type] == "user_news" {
        elasticsearch {
            hosts => ["192.168.77.132:9200"]
            # 注意index的值不支持大写字母
            index => "user_news"
            # document_type自行设置,不设置时,默认为doc
            # document_type => ""
            # 此处的值来自查询sql中的列名称,根据需要自行配置
            document_id => "%{id}"
                       user => elastic
                       password => xW9dqAxThD5U4ShQV1JT
        }
    }
    
}

  1. 全量同步
    以上的单数据源/多数据源同步都是全量同步,即没有任何条件地进行同步。

  2. 增量同步
    增量同步需要在jdbc模块添加相应的增量配置

配置参数
Schedule:是cron格式的同步周期,其它几个都是用来记录同步增量指标的,
Tracking_column:是数据库中的增量指标字段名
Tracking_columu_type:目前只支持两种numeric,timestamp,
Last_run_metadata_path:是保存上次同步的增量指标值。
而 :sql_last_value如果input里面use_column_value => true, 即如果设置为true的话,可以是我们设定的字段的上一次的值。
默认 use_column_value => false, 这样 :sql_last_value为上一次更新的最后时刻值。
也就是说,对于新增的值,才会更新。这样就实现了增量更新的目的。

四、相关配置

  1. 持久队列基本配置(pipelines.yml)
queue.type:persisted    # 默认是memory
queue.max_bytes:4gb     # 队列存储最大数据量
  1. 线程相关配置(logstash.yml)
pipeline.worksers | -w
# pipeline线程数,即filter_output的处理线程数,默认是cpu核数
pipeline.batch.size | -b
# Batcher一次批量获取的待处理文档数,默认是125,可以根据输出进行调整,越大会占用越多的heap空间,可以通过jvm.options调整
pipeline.batch.delay | -u
# Batcher等待的时长,单位为ms

  1. Logstash配置文件:
    logstash设置相关的配置文件(在conf文件夹中)
    logstash.yml:logstash相关配置,比如node.name、path.data、pipeline.workers、queue.type等,这其中的配置可以被命令行参数中的相关参数覆盖
    jvm.options:修改jvm的相关参数,比如修改heap size等
    pipeline配置文件:定义数据处理流程的文件,以.conf结尾
    logstash.yml配置项:
node.name:   节点名称,便于识别
path.data:   持久化存储数据的文件夹,默认是logstash home目录下的data
path.config: 设定pipeline配置文件的目录(如果指定文件夹,会默认把文件夹下的所有.conf文件按照字母顺序拼接为一个文件)
path.log:    设定pipeline日志文件的目录
pipeline.workers: 设定pipeline的线程数(filter+output),优化的常用项
pipeline.batch.size/delay: 设定批量处理数据的数据和延迟
queue.type: 设定队列类型,默认是memory
queue.max_bytes: 队列总容量,默认是1g

五、常见问题

  1. Error: Java::JavaSql::SQLException: null, message from server: "Host 'DESKTOP-T92R1EE' is not allowed to connect to this MySQL server"
    数据库没有开启远程连接
select user,host from user;
# 在Mysql数据库中执行以下sql
update user set host='%' where user ='root';
flush privileges;

最好重启下Mysql


执行SQL
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容