elk安装部署指南

组件介绍

Elasticsearch

是一个基于Lucene的搜索服务器,提供搜集、分析、存储数据三大功能,它提供了一个分布式多用户能力的全文搜索引擎,基于restful web接口,Elasticsearch是用java开发的,能够达到实时搜索、稳定、可靠、快速,安装使用方便。

Logstash

主要是用来日志的搜集、分析、过滤日志的工具,用于管理日志和事件的工具,你可以用它去搜集日志、转换日志、解析日志并将它们作为数据提供给其他模块调用。

Kibana

是一个优秀的前端日志展示框架,他可以非常详细的将日志转换为各种图表,为用户提供强大的数据可视话支持。它能够搜索、展示存储在Elasticsearch中索引数据,使用它可以很方便的用图标、表格、地图展示和分析数据。

Kafka

数据缓冲队列,作为消息队列解耦了处理过程,同时提高了可扩展性,具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

Filebeat

隶属于Beats,是一个轻量级数据收集引擎,基于原先的Logstash-fowarder的源码改造,也会是ELK Stack的第一选择。

目前Beats包含四种工具:

  • Packetbeat(搜索网络流量数据)
  • Metricbeat(搜集系统、进程和文件系统级别的CPU和内存使用情况等数据,通过从操作系统和服务器收集指标,帮助监控福区群以及其托管的服务)
  • Filebeat(收集文件数据-常用文件日志收集分析)
  • Winlogbeat(搜集windows事件日志数据)

版本说明

elk相关的组件版本最好保持一致

  • jdk1.8+
  • elasticsearch: 6.6.1
  • logstash: 6.6.1
  • kibana: 6.6.1
  • Filebeat: 6.6.1
  • kafka: 2.11-1

组件下载地址
官网搭建

组件架构图

elk架构图

依赖组件安装

jdk安装

jdk安装包自行下载解压

环境变量配置

linux中jdk环境变量:
echo '
JAVA_HOME=/usr/local/java
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH
' >>/etc/profile

source /etc/profile

elk部署前环境的检查

服务器设置成固定ip

通过命令查看

/etc/sysconfig/network-scripts/ifcfg-ens33
服务器ip的查看

主机名修改

方便集群配置时服务器的区分
hostname : 显示默认主机名

  • 使用vim命令修改”/etc/hostname“主机名称文件
  • 将原始主机名称删除后追加”新主机名
  • 重启主机

防火墙是否关闭

systemctl status firewalld

确保每个节点防火墙高状态是关闭的,这样每个节点才能被相互访问

selinux状态

通过命令:getenforce
如果状态不是disabled时通过下面命令进行设置

vim /etc/selinux/config
selinux=disabled

iptables规则检查

不管你在安装linux时是否启动了防火墙,如果你想配置属于自己的防火墙,那就清除现在filter的所有规则,为了保证后面elk部署节点通信正常
通过命令进行查看

iptables -nL
iptables

如果不是上图所示,则需要进行iptables规则的清除

iptables -F      清除预设表filter中的所有规则链的规则
iptables -X      清除预设表filter中使用者自定链中的规则

服务器上文件的传递

为了方便各个服务器上进行文件的拷贝可以进行如下设置。

公钥的生成

ssh-keygen  服务器公钥生成,直接回车即可

ip的解析

多个服务器上传递该公钥的时候可以做ip和主机名映射,方便传输

#中添加ip和主机名的映射如下, 下面操作需要在每个节点进行配置,复制即可
vim /etc/hosts
172.16.80.159 lph_node1
172.16.80.160 lph_node2

各节点映射完成后可以进行数据和文件的拷贝

公钥的拷贝-各节点

ssh-copy-id -i #{ip映射的主机名}
占位符可以用真实的主机名替换

软件包的拷贝

公钥准备好之后进行组件软件包的拷贝

// `pwd`获取软件包素在路径,将该软件拷贝到指定节点下的对应路径下,#{占位符}
scp #{kafka_2.11-2.2.2.tgz} lph_client:`pwd` 

elasticsearch安装部署

在安装elasticsearch前可以先添加一个普通用户用来后续elasticsearch的启动操作,因为elasticsearch不能用root用户启动。
// 集群环境下每个节点都需要配置
useradd lphelk
// 设置密码,密码可以随意设值
echo "1" | passwd --stdin lphelk

安装包的解压

// 集群环境下每个节点都需要解压
tar -zxvf #{安装包位置} -C #{镇定解压到哪个位置}

配置文件的修改

命令:vim /elk/elasticsearch-6.6.1/config/elasticsearch.yml

# 注释掉的在是在集群环境下配置
#cluster.name: clustername
node.name: lph_node
node.master: true
node.data: true
# 需要创建相应的数据和日志路径
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
# 方便每个节点正常通行
network.host: 0.0.0.0
http.port: 9200
#discovery.zen.ping.unicast.hosts: ["172.16.80.160"]
#discovery.zen.minimum_master_nodes: 1
#discovery.zen.ping_timeout: 150s
#discovery.zen.fd.ping_retries: 10
#client.transprt.ping_timeout: 60s
http.cors.enabled: true
http.cors.allow-origin: "*"

配置项声明

配置 描述
cluster.name 集群名称,各个几点配置相同的集群名称。
node.name 节点名称,各个节点配置不同。
node.master 指示某个节点是否符合成为主节点的条件。
node.data 指示节点是否为数据节点,数据节点包含并管理索引的一部分。
path.data 数据存储目录。
path.logs 日志存储目录。
bootstrap.memory_lock 内存锁定,是否禁用交换。
bootstrap.system_call_filter 系统调用过滤器。
network.host 绑定节点ip。
http.port rest api端口。
discovery.zen.ping.unicast.hosts 提供其他elasticsearch服务节点单点广播发现功能。
discovery.zen.minimum_master_nodes 集群中可工作的具有master节点资格的最下数量,官方的推荐值是(N/2)+1,其中N是具有master资格的节点的数量。
discovery.zen.ping_timeout 节点在发现过程中的等待时间。
discovery.zen.fd.ping_retries 节点发现重试次数。
http.cors.enabled 是否允许跨源rest请求,用允许head插件访问elasticsearch。
http.cors.allow-origin 允许的源地址。

设置jvm堆大小

可以根据服务器本身配置进行优化

设置jvm堆大小(根据服务器内存大小进行设置,用2g替换1g)
sed -i 's/-Xms1g/-Xms2g/' /elk/elasticsearch-6.6.1/config/jvm.options
sed -i 's/-Xmx1g/-Xmx2g/' /elk/elasticsearch-6.6.1/config/jvm.options

注意:
确保堆内存最大和最小一样,防止程序运行时改变堆内存大小

创建ES数据及日志存储目录

mkdir -pv /data/elasticsearch/data
mkdir -pv /data/elasticsearch/logs

修改安装目录及存储目录权限

查看用户:cat /etc/passwd|grep #{上面创建启动es的普通用户}

chown -R lphelk:lphelk /data/elasticsearch
chown -R lphelk:lphelk /elk/elasticsearch-6.6.1

elasticsearch启动报错

elasticsearch启动报错
elasticsearch启动报错

上面如果修改了/etc/sysctl.conf文件后需要输入下面命令进行重新加载
sysctl -p

启动elasticsearch

// 切换到es普通用户,集群环境下多个节点需要同时启动(间隔时间切记太长)
su - lphelk
cd /elk/elasticsearch-6.6.1 
nohup bin/elasticsearch &

kibana安装部署

安装包的解压

tar -zxvf #{安装包位置} -C #{镇定解压到哪个位置}

配置文件修改

vim /elk/kibana-6.6.1-linux-x86_64/config/kibana.yml
echo '
server.port: 5601
server.host: "172.16.80.159"
elasticsearch.url: "http://172.16.80.159:9200"
kibana.index: ".kibana"
' >> /elk/kibana-6.6.1-linux-x86_64/config/kibana.yml

配置项声明

配置 描述
server.port kibana服务端口,默认为5601。
server.host kibana主机ip地址,默认localhost。
elasticsearch.url 用来做查询的es节点的url,默认http://localhost:9200
kibana.index kibana在elasticsearch中使用的索引来存储保存的searches,visualizations和dashboards,默认是 .kibana。

其他配置项可以参考

kibana启动

cd /elk/kibana-6.6.1-linux-x86_64/
nohup ./bin/kibana &

kibana启动后是可以直接访问的,不需要进行认证,测试环境可行

需要认证的话可以通过nginx和nginx中http-tools
http-tools用于生成nginx认证访问的用户密码文件,需要可自行百度

kafka安装部署

本文档是kafka单节点部署,集群自行百度。

安装配置jdk8

由于kafka、zookeeper(简称:zk)运行依赖于jdk,jdk已安装可跳过。

安装配置zk

kafka运行依赖zk,kafka官网提供的tar包中包含了zk,这里不需要额外下载zk包。

安装包的解压
tar -zxvf #{安装包位置} -C #{镇定解压到哪个位置}
配置文件修改
#vim /elk/kafka_2.11-2.2.2/config/zookeeper.properties

# 通过echo方式注入的话需要将对应配置文件中的相同配置项注释掉

echo '
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
#kafka集群ip:port
server.1=172.16.80.159:2888:3888
#server.1=172.16.80.160:2888:3888
#server.1=172.16.80.161:2888:3888

'>>/elk/kafka_2.11-2.2.2/config/zookeeper.properties

配置项声明
配置 描述
dataDir zk数据存放目录。
dataLogDir zk日志存放目录。
clientPort 客户端连接zk服务器的端口。
tickTime zk服务器之间或客户端与服务器之间维持心跳的时间间隔。
initLimit 允许follower连接并同步到Leader的初始化连接时间,以tickTime为单位,当舒适化连接时间超过该值,则表示连接失败。
syncLimit Leader与Follower之间发生消息时,请求和应答时间长度,如果 follower在设置数据啊in内不能与leader通信,那么此follower将会被丢弃。
server.1 2888是follower与leader交换信息的端口,38888是当leader挂了时用来执行选举时服务器相互通信的端口。
创建数据和日志目录
mkdir -pv /data/zookeeper/data
mkdir -pv /data/zookeeper/logs
创建myid文件
# 每台kafka服务器都需要做成唯一的id,myid中对应的值对应上面配置文件中server.1中的值
echo 1 > /data/zookeeper/data/myid

配置kafka

修改配置文件
修改前可以先将原配置文件中的有效配置注释掉,可以通过下面命令:
sed -i 's/^[^#]/#&/' /elk/kafka_2.11-2.2.2/config/server.properties
echo '
broker.id=1
listeners=PLAINTEXT://172.16.80.159:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
# 单节点这个必须为1,否则可能会导致kafka消费失败
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
#zookeeper.connect=172.16.80.159:2181,172.16.80.160:2181
zookeeper.connect=172.16.80.159:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
' >>/elk/kafka_2.11-2.2.2/config/server.properties

配置项声明
配置 描述
broker.id 每个server需要单独配置broker id,如果不配置系统会自动配置,需要和上一步id一致。
listeners 监听地址,格式PLAINTEXT://IP:PORT
num.network.threads 接收和发送网络信息的线程数。
num.io.threads 服务器用于处理请求的线程数,其中肯能包括磁盘i/o。
socket.send.buffer.bytes 套接字服务器使用的发送缓冲区。
socket.receive.buffer.bytes 套接字服务器使用接收缓冲区。
socket.request.max.bytes 套接字服务器请求的最大大小(防止oom)。
log.dirs 日志文件目录。
num.partitions partition数量。
num.recovery.threads.per.data.dir 在启动时恢复日志、关闭时刷盘日志每个数据目录的线程的数量。默认。
offsets.topic.replication.factor 偏移量话题的复制因子。
transaction.state.log.replication.factor -
transaction.state.log.min.isr -
log.retention.hours 日志文件删除之前保留的时间单位小时,默认168。
log.segment.bytes 单个日志文件的大小,默认1073741824 。
log.retention.check.interval.ms 检查日志段以查看是否可以根据保留策略删除他们的时间间隔。
zookeeper.connect zk的主机地址,如果zookeeper是集群的话则以逗号隔开。
zookeeper.connection.timeout.ms 接收到zk的超时时间。
group.initial.rebalance.delay.ms -
创建log目录
mkdir -pv /data/kafka/logs
其他节点配置

针对集群环境下,只需要把配置好的安装包分发到其他节点,然后修改zk的myid,kafkade broker.id和listeners就可以了,单节点不用配置这一步。

启动,验证zk

(1) 启动

cd /elk/kafka_2.11-2.2.2/
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

集群的话需要每个节点依次执行启动操作

(2)验证

yum -y install nc

echo conf | nc 127.0.0.1 2181
zk启动验证

(3)查看zk的状态

echo stat | nc 127.0.0.1 2181
查看zk的状态
查看zk的状态
启动,验证kafka

(1) 启动

cd /elk/kafka_2.11-2.2.2
nohup bin/kafka-server-start.sh config/server.properties &

集群环境下依次启动即可

(2) 验证
在kafka服务器上创建topic

cd /elk/kafka_2.11-2.2.2

bin/kafka-topics.sh --create --zookeeper 172.16.80.159:2181 --replication-factor 1 --partitions 1 --topic testtopic


查看topic:
bin/kafka-topics.sh --zookeeper 172.16.80.159:2181 --list


kafka集群环境下的验证topic
监控kafka manager

elk中主要收集日志,不需要kafka manager,需要的话可以自行官网下载

logstash安装部署

Logstash运行同样依赖jdk,需要在服务器上安装jdk版本。
(1) 安装

tar -zxvf #{安装包位置} -C #{指定解压到哪个位置}

(2) 配置
创建目录,我将所有的input、filter、output配置文件全部放到该目录中。

mkdir -pv /elk/logstash-6.6.1/etc/conf.d

vim /elk/logstash-6.6.1/etc/conf.d/input.conf

input {
  # 从kafka获取日志数据到logstash
  kafka {
    bootstrap_servers => "172.16.4.35:9092"
    auto_offset_reset => "latest"
    consumer_threads => 1
    decorate_events => true
    topics => ["emp_log_topic"]
    codec => "json"
  }
}



vim /elk/logstash-6.6.1/etc/conf.d/output.conf

output {
        # 输出到elasticsearch中
        elasticsearch {
        index => "emp-log-%{+YYYY-MM-dd}"
        hosts => ["172.16.4.35:9200"]
        }

        # 输出到控制台,在测试的时候可以配置
        stdout {
        codec => rubydebug
        }

}

vim /elk/logstash-6.6.1/etc/conf.d/filter.conf

filter {
  # 过滤日志中不包含指定信息:【lph123578】的日志输出到es中,看需求配置
   if ([message] =~  "^(?!.*?lph123578).*$") {
       drop {}
   }

    # 通过grok 正则的方式进行日志内容的提取并赋值到指定的字段上,如下面的messageDate、appName等
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:messageDate} \[(?<appName>[^\[\]]+)\] \[(?<thread>[^\[\]]+)\] \[(?<tradeId>[^\[\]]+)\] \[(?<keyword1>[^\[\]]*)\] %{LOGLEVEL:level}" }
    }
}

具体grok配置可以参考下面网页
https://www.jianshu.com/p/443f1ea7b640
https://www.jianshu.com/p/5df5055070b2?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation


(3) 启动

cd /elk/logstash-6.6.1

nohup bin/logstash -f etc/conf.d/ --config.reload.automatic &

(4) 报错信息
启动logstash有如下错误时可以按照下面的方式尝试解决,没有报错既可以跳过。

logstash报错信息

解决方案


解决方案

解决方案

Filebeat安装部署

为什么用Filebeat,而不用原来的Logstash呢?原因很简单,每个服务上按照一个logstash对资源消耗比较大。

解压

tar -zxvf #{安装包位置} -C #{指定解压到哪个位置}

cd /elk/filebeat-6.6.1-linux-x86_64

修改配置

修改filebeat配置,支持收集本地目录日志,并输出日志到kafka中。

cd /elk/filebeat
// 之前的filebeat.yml可以备份下
mv filebeat.yml filebeat.yml.bak
vim filebeat.yml

filebeat.prospectors:
- input_type: log
  paths:
    - /usr/local/easymainance/easymaintenance-log-manager-1.0-SNAPSHOT/D:/log/emp-log-manager/config-service.log

 # 解析的是text文本内容,不规则的
  multiline:
     # 指的是每行以时间格式开始的字符串
     pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' 
     negate: true
     match: after
     max_lines: 2000
     timeout: 2s

  # 解析的是json格式的内容
  #json.keys_under_root: true
  # json.add_error_key: true
output.kafka:
  enabled: true
  hosts: ["172.16.4.35:9092"]
  topic: 'emp_log_topic'

Filebeat6.0之后一些配置参数变动比较大,比如document_type就不支持,需要用fields来代替等等。

配置项声明

参数 描述
json.keys_under_root 可以让字段位于根节点,默认为false
json.overwrite_keys 对于同名的key,覆盖原有key值
json.message_key -
json.add_error_key 将错误消息记录存储在error.message字段中

检查配置是否正确

cd /elk/filebeat
./filebeat -c filebeat.yml -configtest

启动

cd /elk/filebeat

nohup ./filebeat -e -c filebeat.yml &

日志收集

日志格式

%date{yyyy-MM-dd HH:mm:ss.SSS} [%X{appname:-}] [%thread] [%X{traceId:-}] [%X{keyword1:-}] %-5level %logger{50}:%L - %msg%n

格式说明

  • %date{yyyy-MM-dd HH:mm:ss.SSS}:日志的时间2020-11-16 17:14:58.661
  • %X{appname:-} : 应用服务名称
  • %thread: 输出产生该日志事件的线程名
  • %X{traceId:-} : 自定义流程追踪Id
  • %X{keyword1:-}: 关键字1
  • %-5level: 输出日志级别,-5表示左对齐并且固定输出5个字符如果不足在右边补空格,类型有DEBUG,INFO,WARN,ERROR
  • %logger{50}: 为输出日志的class命名 限定长度50
  • %msg%n:输出的日志内容

日志格式输出内容展示

2020-11-16 17:14:58.661 [emp-system-manager] [main] [tradeId] [keyword1] INFO com.hzsun.emp.config.ConfigApplication:61 - Started ConfigApplication in 8.757 seconds (JVM running for 9.172)

日志埋点

单服务\单节点日志埋点

package com.hzsun.emp.aop;

import com.hzsun.emp.constant.FileLogContants;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.UUID;

// 处理的单服务日志埋点
@Aspect
@Component
public class FileLogAspect {

    public static final String exepress = "execution(public * com.hzsun.emp.*.controllers.*.*(..))";

    @Value("${spring.application.name}")
    private String applicationName;

    @Pointcut(exepress)
    public void setUpBuriedPoint() {
    }

    @Around("setUpBuriedPoint()")
    public Object markFileLog(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        System.err.println("服务名称:"+ applicationName);
        // 日志埋点
        insertMDC();
        Object result = proceedingJoinPoint.proceed();
        MDC.clear();

        return result;
    }

    private void insertMDC() {
        String traceId = UUID.randomUUID().toString().replace("-", "");
        MDC.put(FileLogContants.TRADE_ID, traceId + FileLogContants.FILTER_FIELD);
        MDC.put(FileLogContants.SERVICE_NAME, applicationName);
        MDC.put(FileLogContants.APP_NAME, FileLogContants.EASY_MAINTENANCE);
    }
}

分布式服务\多服务调用日志埋点

LogTrackInterceptorConfig.class
LogTrackInterceptor.class
EmpFeignConfig.class
LogService.class
package com.hzsun.emp.jwt;

import interceptor.LogTrackInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**
 * 添加拦截器
 * @author lph
 */
@Configuration
public class LogTrackInterceptorConfig implements WebMvcConfigurer {

   @Bean
   public LogTrackInterceptor getLogTrackInterceptor() {
       return new LogTrackInterceptor();
   }

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        /**
         *  根据具体情况设置
         *  LoginInterceptor是自定义的拦截器
         *  addPathPatterns参数/**是通配符表示拦截所有的请求
         *  excludePathPatterns方法的参数是可变参数,可以输n个字符串类型的参数,用来添加白名单
         */
        String[] excludePath = {
                "/doc.html",
                "/swagger-resources/**",
                "/webjars/**",
                "/v2/**",
                "/swagger-ui.html/**"
        };
        registry.addInterceptor(getLogTrackInterceptor()).addPathPatterns("/**").excludePathPatterns(excludePath);
    }
}

package interceptor;

import com.hzsun.emp.constant.FileLogContants;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Optional;
import java.util.UUID;


/**
 * 日志追踪拦截器,feign服务间调用确保traceId一致
 *
 * @version V1.0
 * @author: lph
 */
@Component
public class LogTrackInterceptor implements HandlerInterceptor {
    private static final ThreadLocal<String> TRACE_ID_THREAD_LOCAL = new ThreadLocal<>();

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 请求头中不存在则生成
        String traceId = Optional.ofNullable(request.getHeader(FileLogContants.TRACE_ID)).orElse(UUID.randomUUID().toString().replaceAll("-", ""));

        TRACE_ID_THREAD_LOCAL.set(traceId);
        MDC.put(FileLogContants.TRACE_ID, getTraceId());
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 完成后移除,防止内存占用过大
        TRACE_ID_THREAD_LOCAL.remove();
        MDC.clear();
    }

    public static String getTraceId() {
        return TRACE_ID_THREAD_LOCAL.get();
    }

    public static void setTraceId(String traceId) {
        TRACE_ID_THREAD_LOCAL.set(traceId);
    }
}

package com.hzsun.emp.configuration;

import com.hzsun.emp.constant.FileLogContants;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import interceptor.LogTrackInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.util.Optional;
import java.util.UUID;

/**
 * Feign调用时添加追踪id到请求头
 *
 * @author :lph
 * @version: 1.0
 */
@Configuration
public class EmpFeignConfig implements RequestInterceptor {

    @Override
    public void apply(RequestTemplate requestTemplate) {
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        // 非空校验的目的是防止在启动类中通过feign调用第三方接口时抛出空指针异常
        if (attributes != null) {
            // 将traceId传递到下个服务
            String traceId = Optional.ofNullable(LogTrackInterceptor.getTraceId()).orElse(UUID.randomUUID().toString().replaceAll("-",""));
            requestTemplate.header(FileLogContants.TRACE_ID, traceId);
        }
    }
}

package com.hzsun.emp.serviceapi;

import com.hzsun.emp.configuration.EmpFeignConfig;
import com.hzsun.emp.pojo.po.ScOperationLogsPO;
import com.hzsun.emp.response.HZSunResponse;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;

/**
 * feign调用时在注解中配置
 * value: 指的是被调用方项目在nacos唯一标识  configuration:指的是feign请求的拦截器
 * @FeignClient(value = "emp-log-manager", configuration = EmpFeignConfig.class)
 * @author :lph
 * @version: 1.0
 */
@FeignClient(value = "emp-log-manager", configuration = EmpFeignConfig.class)
public interface LogService {

    /**
     * 获取数据并插入消息记录表
     *
     * @param logsPo 封装请求参数对象
     */
    @PostMapping("/logmanager/log/insert")
    HZSunResponse insertLog(ScOperationLogsPO logsPo);
}

grok提取日志内容

在vim /elk/logstash-6.6.1/etc/conf.d/filter.conf配置

方式一:
处理的日志格式:
%date{yyyy-MM-dd HH:mm:ss.SSS} [%X{appName:-${app_name}}] [%X{serviceName:-${processName}}] [%thread|%X{traceId:-}] [%X{key1:-}] [%X{key2:-}] %-5level %logger{50}:%L - %msg%n

grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:messageDate} \[(?<appName>[^\[\]]+)\] \[(?<thread>[^\[\]]+)\] \[(?<tradeId>[^\[\]]+)\] \[(?<keyword1>[^\[\]]*)\] %{LOGLEVEL:level}" }
    }
方式二:
处理的日志格式:
%date{yyyy-MM-dd HH:mm:ss.SSS} [%X{appName:-${app_name}}|%X{serviceName:-${processName}}|%thread|%X{traceId:-}|%X{key1:-}|%X{key2:-}] %-5level %logger{50}:%L - %msg%n

grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:messageDate} \[(?<appName>[^\[\]]*)\|(?<serviceName>[^\[\]]*)\|(?<thread>[^\[\]]*)\|(?<traceId>[^\[\]]*)\|(?<key1>[^\[\]]*)\|(?<key2>[^\[\]]*)\] %{LOGLEVEL:level}"}
    }

# 可以选择性的使用remove_field去掉原始内容message这一行的信息。
mutate{
            remove_field => ["message","timestamp"]
        }
kibana中对grok中filter的测试
es中日志在kibana中展示

springboot整合elasticsearch

导入依赖

 <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>6.6.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>transport</artifactId>
                <version>6.6.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/transport-netty4-client-->
            <dependency>
                <groupId>org.elasticsearch.plugin</groupId>
                <artifactId>transport-netty4-client</artifactId>
                <version>6.6.1</version>
            </dependency>

常量类

public class FileLogContants {
    public static final String APP_NAME = "appName";
    public static final String SERVICE_NAME = "serviceName";
    public static final String TRADE_ID = "tradeId";
    public static final String KEY_WORD_1 = "keyword1";
    public static final String EASY_MAINTENANCE = "易维";
    public static final String FILTER_FIELD = "lph123578";
    public static final String MESSAGEDATE = "messageDate";

    // 通配符
    public static final String STARS_SYMBOL = "*";

    // 排序
    public static final String TIMESTAMP = "@timestamp";

    // total用于es记录总数的查询
    public static final Integer TOTAL = 0;

    // 关键字标识,es字符串凡是标识为关键字则不会进行分词处理
    public static final String ES_KEYWORD_SYMBOL = ".keyword";

}

elasticsearch配置类

package com.hzsun.emp.log.config;

import com.hzsun.emp.utils.StringUtil;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.net.UnknownHostException;

@Configuration
public class ElasticSearchConfig {

    public static final Logger logger = LoggerFactory.getLogger(ElasticSearchConfig.class);

    @Value("${elasticsearch.host}")
    private String host;

    @Value("${elasticsearch.port}")
    private Integer port;

    @Value("${elasticsearch.cluster-name}")
    private String clusterName;

    private TransportClient transportClient;

    @Bean
    public TransportClient transportClient(){
        Settings settings = Settings.EMPTY;
        if(!StringUtil.isEmpty(clusterName)){
            settings = Settings.builder()
                    .put("cluster.name", clusterName)
                    .build();
        }
        try {
            transportClient = new PreBuiltTransportClient(settings)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
            // 集群环境可以尝试用addTransportAddresses()方法
        } catch (UnknownHostException e) {
            logger.error("创建elasticsearch客户端失败");
            e.printStackTrace();
        }
        logger.info("创建elasticsearch客户端成功");
        return transportClient;
    }


    @Bean
    public BulkProcessor bulkProcessor() throws UnknownHostException {

        Settings settings = Settings.EMPTY;
        if(!StringUtil.isEmpty(clusterName)){
            settings = Settings.builder()
                    .put("cluster.name", clusterName)
                    .build();
        }

        TransportClient transportClient = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));

        return BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                logger.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable);
            }

        }).setBulkActions(1000)//分批,每10000条请求当成一批请求。默认值为1000
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//每次5MB,刷新一次bulk。默认为5m
                .setFlushInterval(TimeValue.timeValueSeconds(5))//每5秒一定执行,不管已经队列积累了多少。默认不设置这个值
                .setConcurrentRequests(1)//设置并发请求数,如果是0,那表示只有一个请求就可以被执行,如果为1,则可以积累并被执行。默认为1.
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//这里有个backoff策略,最初等待100ms,然后按照指数增长并重试3次。每当一个或者多个bulk请求失败,并出现EsRejectedExecutionException异常时.就会尝试重试。这个异常表示用于处理请求的可用计算资源太少。如果要禁用这个backoff策略,需要用backoff.nobackoff()。
                .build();
    }

    @PostConstruct
    void init() {
        System.setProperty("es.set.netty.runtime.available.processors", "false");
    }

}

elasticsearch检索

package com.hzsun.emp.log.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hzsun.emp.constant.FileLogContants;
import com.hzsun.emp.log.bean.FileLogDTO;
import com.hzsun.emp.log.bean.FileLogQO;
import com.hzsun.emp.log.service.IFileLogService;
import com.hzsun.emp.utils.DateUtil;
import com.hzsun.emp.utils.ObjectMapperUtil;
import com.hzsun.emp.utils.StringUtil;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

@Service
public class FileLogServiceImpl implements IFileLogService {

    public static final Logger logger = LoggerFactory.getLogger(FileLogServiceImpl.class);

    @Autowired
    private TransportClient transportClient;

    @Value("${elasticsearch.index}")
    private String elasticIndex;

    @Value("${elasticsearch.type}")
    private String elasticType;

    @Override
    public JSONObject selectLogFromElastic(FileLogQO logQo) {
        logger.info("进行日志埋点");
        // 进行日志埋点操作
        MDC.put(FileLogContants.KEY_WORD_1, "关键字");

        // 根据传入的参数进行查询条件的拼接
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
        if (!StringUtil.isEmpty(logQo.getAppName())) {
            // termQuery中参数加上FileLogContants.ES_KEYWORD_SYMBOL是防止es分词器进行分词影响查询
            queryBuilder.must(QueryBuilders.termQuery(FileLogContants.APP_NAME + FileLogContants.ES_KEYWORD_SYMBOL, logQo.getAppName()));
        }
        if (!StringUtil.isEmpty(logQo.getServiceName())) {
            queryBuilder.must(QueryBuilders.termQuery(FileLogContants.SERVICE_NAME + FileLogContants.ES_KEYWORD_SYMBOL, logQo.getServiceName()));
        }
        if (!StringUtil.isEmpty(logQo.getKeyword1())) {
            queryBuilder.must(QueryBuilders.wildcardQuery(FileLogContants.KEY_WORD_1, FileLogContants.STARS_SYMBOL + logQo.getKeyword1() + FileLogContants.STARS_SYMBOL));
        }
        if (!StringUtil.isEmpty(logQo.getTradeId())) {
            queryBuilder.must(QueryBuilders.wildcardQuery(FileLogContants.TRADE_ID, FileLogContants.STARS_SYMBOL + logQo.getTradeId() + FileLogContants.STARS_SYMBOL));
        }
        if (!StringUtil.isEmpty(logQo.getBeginTime()) && !StringUtil.isEmpty(logQo.getEndTime())) {
            // rangeQuery中参数加上FileLogContants.ES_KEYWORD_SYMBOL是防止es分词器进行分词影响时间范围查询
            queryBuilder.must(QueryBuilders.rangeQuery(FileLogContants.MESSAGEDATE + FileLogContants.ES_KEYWORD_SYMBOL).from(logQo.getBeginTime()).to(logQo.getEndTime()));
        }

        int offset = logQo.getOffset();
        SearchRequestBuilder builder = transportClient.prepareSearch(elasticIndex).setTypes(elasticType)
                .setQuery(queryBuilder)
                .setFrom(offset)
                .setSize(logQo.getPageSize())
                // 根据指定字段进行排序
                .addSort(FileLogContants.TIMESTAMP, SortOrder.DESC)
                //explain为true表示根据数据相关度排序,和关键字匹配最高的排在前面
                .setExplain(true);


        List<FileLogDTO> result = new ArrayList<>();
        SearchResponse searchResponse = builder.get();

        for (SearchHit hit : searchResponse.getHits()) {
            String messageJson = ObjectMapperUtil.writeValueAsString(hit.getSourceAsMap());
            FileLogDTO fileLogDTO = ObjectMapperUtil.readValue(messageJson, FileLogDTO.class);
            assert fileLogDTO != null;
            fileLogDTO.setId(hit.getId());
            result.add(fileLogDTO);
        }

        JSONObject object = new JSONObject();
        object.put("file_log_list", result);
        return object;
    }

    @Override
    public long getTotalSize() {
        // 查询总条数
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        SearchHits hits = transportClient.prepareSearch(elasticIndex).setTypes(elasticType)
                .setQuery(boolQueryBuilder)
                // 设置为0的时候即查询所有数据总数
                .setSize(FileLogContants.TOTAL)
                .get().getHits();

        return hits.getTotalHits();
    }
}

备注

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容