参考文章:
https://www.elastic.co/guide/en/logstash/6.5/installing-logstash.html
rpm下载地址:https://www.elastic.co/downloads/logstash
logstash实践: 分布式系统的日志监控
windows安装logstash6.2.3
前提
linux系统jdk已安装部署完毕
1.下载 logstash的rpm安装包
https://www.elastic.co/cn/downloads/logstash(官网)
https://www.newbe.pro/Mirrors/Mirrors-Logstash/
(国内加速下载)
安装 logstash:
[root@VMTest soft]# rpm -ivh logstash-6.5.4.rpm
2.修改配置文件
#创建日志和数据目录
# mkdir -p /data/logs/logstash
# mkdir -p /data/logstash_data
# chown -R logstash:logstash /data/logs/logstash/
# chown -R logstash:logstash /data/logstash_data/
# 修改如下内容为自己的内容
[root@VMTest soft]# vi /etc/logstash/logstash.yml
#配置时一定要执行:
# chown -R logstash:logstash /data/logs/logstash/
# chown -R logstash:logstash /data/logstash_data/
path.data: /data/logstash_data
path.logs: /data/logs/logstash
#其它字段
# path.config:
# logstash每隔一段时间会自动重新加载conf文件,reload配置文件的时间间隔
# config.reload.interval: 3s
# config.debug: false
# queue.type: memory
# path.queue:
# queue.page_capacity: 64mb
# queue.max_bytes: 1024mb
# queue.checkpoint.acks: 1024
# dead_letter_queue.max_bytes: 1024mb
# http.host: "主机地址"
# http.port: 9600-9700
3.启停服务
#============================================================#
[root@VMTest ~]# systemctl enable logstash # 开机启动
[root@VMTest ~]# systemctl start logstash # 启动
[root@VMTest ~]# systemctl status logstash
[root@VMTest ~]# systemctl stop logstash
[root@VMTest ~]# systemctl restart logstash
#说明:
执行完systemctl start logstash命令, 启动完成后,
会自动执行/etc/logstash/conf.d/文件夹下的.conf文件
#============================================================#
#---------------- 以下是压缩包启动logstash的操作 ---------------#
#============================================================#
# 检测配置文件语法是否有问题
logstash -f /etc/logstash/conf.d/system.conf -t
# 先在Indexer主机上启动
[root@VMTest soft]# nohup /usr/local/logstash-1.4.3/bin/logstash agent -f indexer.conf &>/dev/null &
# 再在Shipper主机上启动
[root@VMTest soft]# nohup /usr/local/logstash-1.4.3/bin/logstash agent -f shipper.conf &>/dev/null &
# 最后在Indexer上观察日志
[root@VMTest soft]# tail -f /data/log/logstash/all.log
#运行logstash
#控制台采集数据,控制台输出数据
[root@VMTest soft]# cd /app/logstash
[root@VMTest soft]# bin/logstash -e 'input { stdin { } } output { stdout {} }'
#读取配置文件启动
[root@VMTest soft]# logstash -f logstash_default.conf
4.创建logstash 的配置文件
[root@elk ~]# cat /etc/logstash/conf.d/01-logstash-initial.conf
#================================= input kafka, output: es 示例 =============================#
input {
kafka {
bootstrap_servers => ["10.101.15.163:9092"]
group_id => "deepcogni_qa"
topics => ["deepcogni"]
consumer_threads => 5
decorate_events => true
codec => "json"
}
}
output {
elasticsearch {
hosts => ["10.101.15.163:9200"]
index => "deepcogni:qa"
codec => "json"
}
}
#================================= input logfile, output es =============================#
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
#beats {
# port => 5044
#}
file {
path => "/var/log/httpd/access_log"
start_position => beginning
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][logstash]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}
#================================= input: log file, output: redis =============================#
input {
file {
path => [
# 这里填写需要监控的文件
"/data/log/php/php_fetal.log",
"/data/log/service1/access.log"
]
}
}
output {
# 输出到控制台
# stdout { }
# 输出到redis
redis {
host => "10.140.45.190" # redis主机地址
port => 6379 # redis端口号
db => 8 # redis数据库编号
data_type => "channel" # 使用发布/订阅模式
key => "logstash_list_0" # 发布通道名称
}
}
#================================= input: redis, output: log file =============================#
input {
redis {
host => "10.140.45.190" # redis主机地址
port => 6379 # redis端口号
db => 8 # redis数据库编号
data_type => "channel" # 使用发布/订阅模式
key => "logstash_list_0" # 发布通道名称
}
}
output {
file {
path => "/data/log/logstash/all.log" # 指定写入文件路径
message_format => "%{host} %{message}" # 指定写入格式
flush_interval => 0 # 指定刷新间隔,0代表实时写入
}
}
#================================= input 样式系列 =============================#
input {
file {
codec => json
path => [
"/opt/build/*.json"
]
}
}
input {
file {
path => [
# 这里填写需要监控的文件
"/data/log/php/php_fetal.log",
"/data/log/service1/access.log"
]
}
}
input {
beats {
port => 5000
type => "logs"
ssl => true
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
}
}
#================================= filter样式系列 =============================#
filter {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
}
filter {
if [type] == "syslog-beat" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
geoip {
source => "clientip"
}
syslog_pri {}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
#================================= output 样式系列 =============================#
output {
# 输出到控制台
# stdout { }
# 输出到redis
redis {
host => "10.140.45.190" # redis主机地址
port => 6379 # redis端口号
db => 8 # redis数据库编号
data_type => "channel" # 使用发布/订阅模式
key => "logstash_list_0" # 发布通道名称
}
}
#写入本地文件。
output {
file {
path => "/data/log/logstash/all.log" # 指定写入文件路径
message_format => "%{host} %{message}" # 指定写入格式
flush_interval => 0 # 指定刷新间隔,0代表实时写入
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200" #改成你的elasticsearch地址
}
}
output {
elasticsearch { }
stdout { codec => rubydebug }
}
5.logstash的一些配置
测试logstash,一个日志存储管道有两个必需的元素,输入和输出,以及一个可选的元素,过滤器。
输入插件使用来自源的数据,过滤器插件根据您的指定修改数据,并且输出插件将数据写到目的地。
配置参考文件
file {
path => "/data/web/logstash/logFile/*/*"
start_position => "beginning" #从文件开始处读写
}
有一些比较有用的配置项,可以用来指定 FileWatch 库的行为:
discover_interval
logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒。
exclude
不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开。
sincedb_path
默认路径$HOME/.sincedb(Windows:C:\Windows\System32\config\systemprofile\.sincedb),
可以通过这个配置定义sincedb文件到其他位置。
sincedb_write_interval
logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
stat_interval
logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。
start_position
logstash 从什么位置开始读取文件数据,默认是结束位置,
即 logstash 进程会以类似 tail -F 的形式运行。
如果要导入原有数据,把设定改成 "beginning",logstash 进程就从头开始读取,
有点类似cat,但是读到最后一行不会终止,而是继续变成 tail -F。
6. Filter配置
filter {
grok {
match => { "path" => "/usr/local/test/logs/%{DATA:fileName}.log"} #获取log文件名
}
if [message] =~ ".*Exception.*" { # 格式化日志信息
grok {
match => { "message" => ".*%{LOGLEVEL:priority}.*uid=%{NUMBER:uid},traceId=%{DATA:traceId} .*\(%{DATA:method}@%{DATA:class}\.java:%{NUMBER:line}\)" }
}
}
else {
grok {
mutate { replace => { "type" => "apache_access" } }
match => {" message " => "\s*\[impl\]\[in\]requestId=%{NUMBER:reqId},reqTime=%{NUMBER:reqTime} req=%{GREEDYDATA:req}" } # 会自动生成相应的JSON的_source的key-value
}
}
}
7. 指定Exeception异常提醒
filter {
if [message] =~ "\s*(Arithmetic|Runtime)Exception\s*" {
ruby {
code => “”
add_tag => "specific_exeception"
}
}
grok {
match => { "message" => "\s*\[impl\]\[in\] traceId=%{NUMBER:traceId},reqTime=%{NUMBER:reqTime} req=%{GREEDYDATA:req}" }
}
if [priority] == "ERROR" {
grok {
tag_on_failure => "error"
}
}
}
}
8. 预警 发送email提醒
# input 与 filter 参考上面的
output {
# 如果event中标记为“error”,表示出错
if "error" in [tags] {
email {
from => "shyn_xxxxx@163.com"
to => "123456@qq.com"
via => "smtp"
port => 25
subject => "[%{@timestamp} xxx服务器 日志发现异常!]"
address => "smtp.163.com"
domain => "smtp.163.com"
body => "u have new bug ! %{message}"
username => "shyn_xxxxx@163.com"
password => "password"
}
}
stdout { codec => rubydebug }
}
9. 预警及发送http提醒
input {
# 略
}
filter {
grok {
match => { "message" => "\s*\[impl\]\[in\] traceId=%{NUMBER:traceId},reqTime=%{NUMBER:reqTime} req=%{GREEDYDATA:req}" }
# remove_field => [ "message" ]
}
if [priority] == "ERROR" { #是错误 添加标记
grok {
tag_on_failure => "error"
}
}
# 计数
metrics {
# 每60秒清空计数器 #should be a multiple of 5s
clear_interval =>60
# 每隔60秒统计一次 #Must be a multiple of 5s
flush_interval =>60
# 计数器数据保存的字段名 priority的值默认就有 是日志的级别
meter => "events_%{priority}"
# 增加"metric",作为标记
add_tag => "metric"
# 3秒内的message数据才统计,避免延迟
# ignore_older_than => 3
}
# 如果event中标记为“metric”的
if "metric" in [tags] {
# 执行ruby代码
ruby {
# 如果level为warn的数量小于3条,就忽略此事件(即不发送任何消息)。
code => "event.cancel if event['events_WARN']['count'] < 3"
}
}
}
output {
# 如果event中标记为“metric”,表示只发送计数的消息。
if "metric" in [tags]{
# 通过http请求公众号发送微信消息。(此处简略描述)
http {
http_method => "post"
url => "http://116.1.1.112:33333/xxx?count=%{[events_WARN][count]}"
}
}
stdout { codec => rubydebug } # 标准输出
# elasticsearch {
# action => "index" #The operation on ES
# hosts => "127.0.0.1:9344" #ElasticSearch host, can be array.
# index => "logstash-%{+YYYY.MM.dd}" #The index to write data to.
# }
}