ELK-安装 logstash日志收集

参考文章:
https://www.elastic.co/guide/en/logstash/6.5/installing-logstash.html
rpm下载地址:https://www.elastic.co/downloads/logstash
logstash实践: 分布式系统的日志监控
windows安装logstash6.2.3

前提

linux系统jdk已安装部署完毕

1.下载 logstash的rpm安装包

https://www.elastic.co/cn/downloads/logstash(官网)
https://www.newbe.pro/Mirrors/Mirrors-Logstash/
(国内加速下载)

安装 logstash:

[root@VMTest soft]# rpm -ivh logstash-6.5.4.rpm

2.修改配置文件

#创建日志和数据目录
# mkdir -p /data/logs/logstash
# mkdir -p /data/logstash_data
# chown -R logstash:logstash /data/logs/logstash/
# chown -R logstash:logstash /data/logstash_data/
 
# 修改如下内容为自己的内容
[root@VMTest soft]# vi /etc/logstash/logstash.yml
#配置时一定要执行:
#  chown -R logstash:logstash /data/logs/logstash/
#  chown -R logstash:logstash /data/logstash_data/
path.data: /data/logstash_data
path.logs: /data/logs/logstash
 
#其它字段
# path.config:
# logstash每隔一段时间会自动重新加载conf文件,reload配置文件的时间间隔
# config.reload.interval: 3s
# config.debug: false
# queue.type: memory
# path.queue:
# queue.page_capacity: 64mb
# queue.max_bytes: 1024mb
# queue.checkpoint.acks: 1024
# dead_letter_queue.max_bytes: 1024mb
 
# http.host: "主机地址"
# http.port: 9600-9700

3.启停服务


#============================================================#
[root@VMTest ~]# systemctl enable logstash   # 开机启动
[root@VMTest ~]# systemctl start logstash    # 启动
[root@VMTest ~]# systemctl status logstash  
[root@VMTest ~]# systemctl stop logstash  
[root@VMTest ~]# systemctl restart logstash  

#说明:
执行完systemctl start logstash命令, 启动完成后,
会自动执行/etc/logstash/conf.d/文件夹下的.conf文件

#============================================================#
#---------------- 以下是压缩包启动logstash的操作 ---------------#
#============================================================#

# 检测配置文件语法是否有问题
logstash -f /etc/logstash/conf.d/system.conf -t                 

# 先在Indexer主机上启动
[root@VMTest soft]# nohup /usr/local/logstash-1.4.3/bin/logstash agent -f indexer.conf &>/dev/null &
# 再在Shipper主机上启动
[root@VMTest soft]# nohup /usr/local/logstash-1.4.3/bin/logstash agent -f shipper.conf &>/dev/null &
# 最后在Indexer上观察日志
[root@VMTest soft]# tail -f /data/log/logstash/all.log

#运行logstash
#控制台采集数据,控制台输出数据
[root@VMTest soft]# cd /app/logstash
[root@VMTest soft]# bin/logstash -e 'input { stdin { } } output { stdout {} }'
#读取配置文件启动
[root@VMTest soft]# logstash -f logstash_default.conf

4.创建logstash 的配置文件

[root@elk ~]# cat /etc/logstash/conf.d/01-logstash-initial.conf
 
#================================= input kafka, output: es 示例 =============================#
input {
    kafka {
        bootstrap_servers => ["10.101.15.163:9092"]
            group_id => "deepcogni_qa"
            topics => ["deepcogni"]
            consumer_threads => 5
            decorate_events => true
            codec => "json"
        }
}
output {
    elasticsearch {
        hosts => ["10.101.15.163:9200"]
        index => "deepcogni:qa"
        codec => "json"
   }
}
 
 
#================================= input logfile, output es =============================#
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
 
input {
  #beats {
   # port => 5044
  #}
  file {
    path => "/var/log/httpd/access_log"
    start_position => beginning
  }
}
 
output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][logstash]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    #user => "elastic"
    #password => "changeme"
  }
}
#================================= input: log file, output: redis =============================#
input {
    file {
        path => [
            # 这里填写需要监控的文件
            "/data/log/php/php_fetal.log",
            "/data/log/service1/access.log"
        ]
    }
}
 
output {
    # 输出到控制台
    # stdout { }
 
    # 输出到redis
    redis {
        host => "10.140.45.190"   # redis主机地址
        port => 6379              # redis端口号
        db => 8                   # redis数据库编号
        data_type => "channel"    # 使用发布/订阅模式
        key => "logstash_list_0"  # 发布通道名称
    }
}
#================================= input: redis, output: log file  =============================#
input {
    redis { 
        host      => "10.140.45.190"    # redis主机地址
        port      => 6379               # redis端口号
        db        => 8                  # redis数据库编号
        data_type => "channel"          # 使用发布/订阅模式
        key       => "logstash_list_0"  # 发布通道名称
    } 
}
output { 
    file { 
        path           => "/data/log/logstash/all.log" # 指定写入文件路径
        message_format => "%{host} %{message}"         # 指定写入格式
        flush_interval => 0                            # 指定刷新间隔,0代表实时写入
    }
}
#================================= input 样式系列 =============================#
input {
    file {
        codec => json
        path => [
            "/opt/build/*.json"
        ]
    }
}
input {
    file {
        path => [
            # 这里填写需要监控的文件
            "/data/log/php/php_fetal.log",
            "/data/log/service1/access.log"
        ]
    }
}
input {
  beats {
    port => 5000
    type => "logs"
    ssl => true
    ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
    ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
  }
}
 
#================================= filter样式系列 =============================#
filter { 
  grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }    
}
 
 
filter {
  if [type] == "syslog-beat" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    geoip {
      source => "clientip"
    }
    syslog_pri {}
    date {
      match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}
 
#================================= output 样式系列 =============================#
output {
    # 输出到控制台
    # stdout { }
 
    # 输出到redis
    redis {
        host => "10.140.45.190"   # redis主机地址
        port => 6379              # redis端口号
        db => 8                   # redis数据库编号
        data_type => "channel"    # 使用发布/订阅模式
        key => "logstash_list_0"  # 发布通道名称
    }
}
 
#写入本地文件。
output { 
    file { 
        path           => "/data/log/logstash/all.log" # 指定写入文件路径
        message_format => "%{host} %{message}"         # 指定写入格式
        flush_interval => 0                            # 指定刷新间隔,0代表实时写入
    }
}
 
output {
  elasticsearch { 
      hosts => "elasticsearch:9200" #改成你的elasticsearch地址 
  }
}
 
output {
  elasticsearch { }
  stdout { codec => rubydebug }
}

5.logstash的一些配置

测试logstash,一个日志存储管道有两个必需的元素,输入和输出,以及一个可选的元素,过滤器。

输入插件使用来自源的数据,过滤器插件根据您的指定修改数据,并且输出插件将数据写到目的地。

配置参考文件

file {  
  path => "/data/web/logstash/logFile/*/*"
  start_position => "beginning" #从文件开始处读写
}
 
有一些比较有用的配置项,可以用来指定 FileWatch 库的行为:
discover_interval 
    logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒。
exclude 
    不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开。
sincedb_path 
    默认路径$HOME/.sincedb(Windows:C:\Windows\System32\config\systemprofile\.sincedb),
    可以通过这个配置定义sincedb文件到其他位置。
sincedb_write_interval 
    logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
stat_interval 
    logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。
start_position 
    logstash 从什么位置开始读取文件数据,默认是结束位置,
    即 logstash 进程会以类似 tail -F 的形式运行。
    如果要导入原有数据,把设定改成 "beginning",logstash 进程就从头开始读取,
    有点类似cat,但是读到最后一行不会终止,而是继续变成 tail -F。

6. Filter配置

filter {
  grok {
     match => { "path" => "/usr/local/test/logs/%{DATA:fileName}.log"}  #获取log文件名
  }
  if [message] =~ ".*Exception.*" {  # 格式化日志信息
    grok {
        match => { "message" => ".*%{LOGLEVEL:priority}.*uid=%{NUMBER:uid},traceId=%{DATA:traceId} .*\(%{DATA:method}@%{DATA:class}\.java:%{NUMBER:line}\)" }
    }
  }
  else {
    grok {
        mutate { replace => { "type" => "apache_access" } }
        match => {" message " => "\s*\[impl\]\[in\]requestId=%{NUMBER:reqId},reqTime=%{NUMBER:reqTime} req=%{GREEDYDATA:req}" }  # 会自动生成相应的JSON的_source的key-value
    }
  }
}

7. 指定Exeception异常提醒

filter {
  if [message] =~ "\s*(Arithmetic|Runtime)Exception\s*" {
    ruby {
         code => “”
        add_tag => "specific_exeception"
    }
  }
  grok {
     match => { "message" => "\s*\[impl\]\[in\] traceId=%{NUMBER:traceId},reqTime=%{NUMBER:reqTime} req=%{GREEDYDATA:req}" }
  }
  if [priority] == "ERROR" {
     grok {
        tag_on_failure => "error"
     }
  }
}
}

8. 预警 发送email提醒

# input 与 filter 参考上面的
output {
  # 如果event中标记为“error”,表示出错
  if "error" in [tags] {            
     email { 
        from => "shyn_xxxxx@163.com" 
        to => "123456@qq.com" 
        via => "smtp" 
        port => 25
        subject => "[%{@timestamp} xxx服务器 日志发现异常!]" 
        address => "smtp.163.com" 
        domain => "smtp.163.com" 
        body => "u have new bug ! %{message}" 
        username => "shyn_xxxxx@163.com" 
        password => "password" 
     }
  }
  stdout { codec => rubydebug }
}

9. 预警及发送http提醒

input {
    # 略
}
filter {
  grok {
     match => { "message" => "\s*\[impl\]\[in\] traceId=%{NUMBER:traceId},reqTime=%{NUMBER:reqTime} req=%{GREEDYDATA:req}" } 
    # remove_field => [ "message" ]
  }
  if [priority] == "ERROR" { #是错误 添加标记
     grok {
        tag_on_failure => "error"
     }
  }
  # 计数
  metrics {
      # 每60秒清空计数器  #should be a multiple of 5s
      clear_interval =>60
      # 每隔60秒统计一次  #Must be a multiple of 5s
      flush_interval =>60
      # 计数器数据保存的字段名 priority的值默认就有 是日志的级别
      meter => "events_%{priority}"
      # 增加"metric",作为标记
      add_tag => "metric"
      # 3秒内的message数据才统计,避免延迟
      # ignore_older_than => 3
  }
  # 如果event中标记为“metric”的
  if "metric" in [tags] {
      # 执行ruby代码
      ruby {
          # 如果level为warn的数量小于3条,就忽略此事件(即不发送任何消息)。
          code => "event.cancel if event['events_WARN']['count'] < 3"
      }
  }
}
output {
  # 如果event中标记为“metric”,表示只发送计数的消息。
  if "metric" in [tags]{
   # 通过http请求公众号发送微信消息。(此处简略描述)
      http {
          http_method => "post"  
          url => "http://116.1.1.112:33333/xxx?count=%{[events_WARN][count]}"
      }
  }
  stdout { codec => rubydebug } # 标准输出
 
#  elasticsearch {
#    action => "index"          #The operation on ES
#    hosts  => "127.0.0.1:9344"   #ElasticSearch host, can be array.
#    index  => "logstash-%{+YYYY.MM.dd}"         #The index to write data to.
#  }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342