组件介绍
Elasticsearch
是一个基于Lucene的搜索服务器,提供搜集、分析、存储数据三大功能,它提供了一个分布式多用户能力的全文搜索引擎,基于restful web接口,Elasticsearch是用java开发的,能够达到实时搜索、稳定、可靠、快速,安装使用方便。
Logstash
主要是用来日志的搜集、分析、过滤日志的工具,用于管理日志和事件的工具,你可以用它去搜集日志、转换日志、解析日志并将它们作为数据提供给其他模块调用。
Kibana
是一个优秀的前端日志展示框架,他可以非常详细的将日志转换为各种图表,为用户提供强大的数据可视话支持。它能够搜索、展示存储在Elasticsearch中索引数据,使用它可以很方便的用图标、表格、地图展示和分析数据。
Kafka
数据缓冲队列,作为消息队列解耦了处理过程,同时提高了可扩展性,具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
Filebeat
隶属于Beats,是一个轻量级数据收集引擎,基于原先的Logstash-fowarder的源码改造,也会是ELK Stack的第一选择。
目前Beats包含四种工具:
- Packetbeat(搜索网络流量数据)
- Metricbeat(搜集系统、进程和文件系统级别的CPU和内存使用情况等数据,通过从操作系统和服务器收集指标,帮助监控福区群以及其托管的服务)
- Filebeat(收集文件数据-常用文件日志收集分析)
- Winlogbeat(搜集windows事件日志数据)
版本说明
elk相关的组件版本最好保持一致
- jdk1.8+
- elasticsearch: 6.6.1
- logstash: 6.6.1
- kibana: 6.6.1
- Filebeat: 6.6.1
- kafka: 2.11-1
组件架构图
依赖组件安装
jdk安装
jdk安装包自行下载解压
环境变量配置
linux中jdk环境变量:
echo '
JAVA_HOME=/usr/local/java
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH
' >>/etc/profile
source /etc/profile
elk部署前环境的检查
服务器设置成固定ip
通过命令查看
/etc/sysconfig/network-scripts/ifcfg-ens33
主机名修改
方便集群配置时服务器的区分
hostname : 显示默认主机名
- 使用vim命令修改”/etc/hostname“主机名称文件
- 将原始主机名称删除后追加”新主机名
- 重启主机
防火墙是否关闭
systemctl status firewalld
确保每个节点防火墙高状态是关闭的,这样每个节点才能被相互访问
selinux状态
通过命令:getenforce
如果状态不是disabled时通过下面命令进行设置
vim /etc/selinux/config
selinux=disabled
iptables规则检查
不管你在安装linux时是否启动了防火墙,如果你想配置属于自己的防火墙,那就清除现在filter的所有规则,为了保证后面elk部署节点通信正常
通过命令进行查看
iptables -nL
如果不是上图所示,则需要进行iptables规则的清除
iptables -F 清除预设表filter中的所有规则链的规则
iptables -X 清除预设表filter中使用者自定链中的规则
服务器上文件的传递
为了方便各个服务器上进行文件的拷贝可以进行如下设置。
公钥的生成
ssh-keygen 服务器公钥生成,直接回车即可
ip的解析
多个服务器上传递该公钥的时候可以做ip和主机名映射,方便传输
#中添加ip和主机名的映射如下, 下面操作需要在每个节点进行配置,复制即可
vim /etc/hosts
172.16.80.159 lph_node1
172.16.80.160 lph_node2
各节点映射完成后可以进行数据和文件的拷贝
公钥的拷贝-各节点
ssh-copy-id -i #{ip映射的主机名}
占位符可以用真实的主机名替换
软件包的拷贝
公钥准备好之后进行组件软件包的拷贝
// `pwd`获取软件包素在路径,将该软件拷贝到指定节点下的对应路径下,#{占位符}
scp #{kafka_2.11-2.2.2.tgz} lph_client:`pwd`
elasticsearch安装部署
在安装elasticsearch前可以先添加一个普通用户用来后续elasticsearch的启动操作,因为elasticsearch不能用root用户启动。
// 集群环境下每个节点都需要配置
useradd lphelk
// 设置密码,密码可以随意设值
echo "1" | passwd --stdin lphelk
安装包的解压
// 集群环境下每个节点都需要解压
tar -zxvf #{安装包位置} -C #{镇定解压到哪个位置}
配置文件的修改
命令:vim /elk/elasticsearch-6.6.1/config/elasticsearch.yml
# 注释掉的在是在集群环境下配置
#cluster.name: clustername
node.name: lph_node
node.master: true
node.data: true
# 需要创建相应的数据和日志路径
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
# 方便每个节点正常通行
network.host: 0.0.0.0
http.port: 9200
#discovery.zen.ping.unicast.hosts: ["172.16.80.160"]
#discovery.zen.minimum_master_nodes: 1
#discovery.zen.ping_timeout: 150s
#discovery.zen.fd.ping_retries: 10
#client.transprt.ping_timeout: 60s
http.cors.enabled: true
http.cors.allow-origin: "*"
配置项声明
配置 | 描述 |
---|---|
cluster.name | 集群名称,各个几点配置相同的集群名称。 |
node.name | 节点名称,各个节点配置不同。 |
node.master | 指示某个节点是否符合成为主节点的条件。 |
node.data | 指示节点是否为数据节点,数据节点包含并管理索引的一部分。 |
path.data | 数据存储目录。 |
path.logs | 日志存储目录。 |
bootstrap.memory_lock | 内存锁定,是否禁用交换。 |
bootstrap.system_call_filter | 系统调用过滤器。 |
network.host | 绑定节点ip。 |
http.port | rest api端口。 |
discovery.zen.ping.unicast.hosts | 提供其他elasticsearch服务节点单点广播发现功能。 |
discovery.zen.minimum_master_nodes | 集群中可工作的具有master节点资格的最下数量,官方的推荐值是(N/2)+1,其中N是具有master资格的节点的数量。 |
discovery.zen.ping_timeout | 节点在发现过程中的等待时间。 |
discovery.zen.fd.ping_retries | 节点发现重试次数。 |
http.cors.enabled | 是否允许跨源rest请求,用允许head插件访问elasticsearch。 |
http.cors.allow-origin | 允许的源地址。 |
设置jvm堆大小
可以根据服务器本身配置进行优化
设置jvm堆大小(根据服务器内存大小进行设置,用2g替换1g)
sed -i 's/-Xms1g/-Xms2g/' /elk/elasticsearch-6.6.1/config/jvm.options
sed -i 's/-Xmx1g/-Xmx2g/' /elk/elasticsearch-6.6.1/config/jvm.options
注意:
确保堆内存最大和最小一样,防止程序运行时改变堆内存大小
创建ES数据及日志存储目录
mkdir -pv /data/elasticsearch/data
mkdir -pv /data/elasticsearch/logs
修改安装目录及存储目录权限
查看用户:cat /etc/passwd|grep #{上面创建启动es的普通用户}
chown -R lphelk:lphelk /data/elasticsearch
chown -R lphelk:lphelk /elk/elasticsearch-6.6.1
elasticsearch启动报错
上面如果修改了/etc/sysctl.conf文件后需要输入下面命令进行重新加载
sysctl -p
启动elasticsearch
// 切换到es普通用户,集群环境下多个节点需要同时启动(间隔时间切记太长)
su - lphelk
cd /elk/elasticsearch-6.6.1
nohup bin/elasticsearch &
kibana安装部署
安装包的解压
tar -zxvf #{安装包位置} -C #{镇定解压到哪个位置}
配置文件修改
vim /elk/kibana-6.6.1-linux-x86_64/config/kibana.yml
echo '
server.port: 5601
server.host: "172.16.80.159"
elasticsearch.url: "http://172.16.80.159:9200"
kibana.index: ".kibana"
' >> /elk/kibana-6.6.1-linux-x86_64/config/kibana.yml
配置项声明
配置 | 描述 |
---|---|
server.port | kibana服务端口,默认为5601。 |
server.host | kibana主机ip地址,默认localhost。 |
elasticsearch.url | 用来做查询的es节点的url,默认http://localhost:9200。 |
kibana.index | kibana在elasticsearch中使用的索引来存储保存的searches,visualizations和dashboards,默认是 .kibana。 |
kibana启动
cd /elk/kibana-6.6.1-linux-x86_64/
nohup ./bin/kibana &
kibana启动后是可以直接访问的,不需要进行认证,测试环境可行
需要认证的话可以通过nginx和nginx中http-tools
http-tools用于生成nginx认证访问的用户密码文件,需要可自行百度
kafka安装部署
本文档是kafka单节点部署,集群自行百度。
安装配置jdk8
由于kafka、zookeeper(简称:zk)运行依赖于jdk,jdk已安装可跳过。
安装配置zk
kafka运行依赖zk,kafka官网提供的tar包中包含了zk,这里不需要额外下载zk包。
安装包的解压
tar -zxvf #{安装包位置} -C #{镇定解压到哪个位置}
配置文件修改
#vim /elk/kafka_2.11-2.2.2/config/zookeeper.properties
# 通过echo方式注入的话需要将对应配置文件中的相同配置项注释掉
echo '
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
#kafka集群ip:port
server.1=172.16.80.159:2888:3888
#server.1=172.16.80.160:2888:3888
#server.1=172.16.80.161:2888:3888
'>>/elk/kafka_2.11-2.2.2/config/zookeeper.properties
配置项声明
配置 | 描述 |
---|---|
dataDir | zk数据存放目录。 |
dataLogDir | zk日志存放目录。 |
clientPort | 客户端连接zk服务器的端口。 |
tickTime | zk服务器之间或客户端与服务器之间维持心跳的时间间隔。 |
initLimit | 允许follower连接并同步到Leader的初始化连接时间,以tickTime为单位,当舒适化连接时间超过该值,则表示连接失败。 |
syncLimit | Leader与Follower之间发生消息时,请求和应答时间长度,如果 follower在设置数据啊in内不能与leader通信,那么此follower将会被丢弃。 |
server.1 | 2888是follower与leader交换信息的端口,38888是当leader挂了时用来执行选举时服务器相互通信的端口。 |
创建数据和日志目录
mkdir -pv /data/zookeeper/data
mkdir -pv /data/zookeeper/logs
创建myid文件
# 每台kafka服务器都需要做成唯一的id,myid中对应的值对应上面配置文件中server.1中的值
echo 1 > /data/zookeeper/data/myid
配置kafka
修改配置文件
修改前可以先将原配置文件中的有效配置注释掉,可以通过下面命令:
sed -i 's/^[^#]/#&/' /elk/kafka_2.11-2.2.2/config/server.properties
echo '
broker.id=1
listeners=PLAINTEXT://172.16.80.159:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
# 单节点这个必须为1,否则可能会导致kafka消费失败
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
#zookeeper.connect=172.16.80.159:2181,172.16.80.160:2181
zookeeper.connect=172.16.80.159:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
' >>/elk/kafka_2.11-2.2.2/config/server.properties
配置项声明
配置 | 描述 |
---|---|
broker.id | 每个server需要单独配置broker id,如果不配置系统会自动配置,需要和上一步id一致。 |
listeners | 监听地址,格式PLAINTEXT://IP:PORT。 |
num.network.threads | 接收和发送网络信息的线程数。 |
num.io.threads | 服务器用于处理请求的线程数,其中肯能包括磁盘i/o。 |
socket.send.buffer.bytes | 套接字服务器使用的发送缓冲区。 |
socket.receive.buffer.bytes | 套接字服务器使用接收缓冲区。 |
socket.request.max.bytes | 套接字服务器请求的最大大小(防止oom)。 |
log.dirs | 日志文件目录。 |
num.partitions | partition数量。 |
num.recovery.threads.per.data.dir | 在启动时恢复日志、关闭时刷盘日志每个数据目录的线程的数量。默认。 |
offsets.topic.replication.factor | 偏移量话题的复制因子。 |
transaction.state.log.replication.factor | - |
transaction.state.log.min.isr | - |
log.retention.hours | 日志文件删除之前保留的时间单位小时,默认168。 |
log.segment.bytes | 单个日志文件的大小,默认1073741824 。 |
log.retention.check.interval.ms | 检查日志段以查看是否可以根据保留策略删除他们的时间间隔。 |
zookeeper.connect | zk的主机地址,如果zookeeper是集群的话则以逗号隔开。 |
zookeeper.connection.timeout.ms | 接收到zk的超时时间。 |
group.initial.rebalance.delay.ms | - |
创建log目录
mkdir -pv /data/kafka/logs
其他节点配置
针对集群环境下,只需要把配置好的安装包分发到其他节点,然后修改zk的myid,kafkade broker.id和listeners就可以了,单节点不用配置这一步。
启动,验证zk
(1) 启动
cd /elk/kafka_2.11-2.2.2/
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
集群的话需要每个节点依次执行启动操作
(2)验证
yum -y install nc
echo conf | nc 127.0.0.1 2181
(3)查看zk的状态
echo stat | nc 127.0.0.1 2181
启动,验证kafka
(1) 启动
cd /elk/kafka_2.11-2.2.2
nohup bin/kafka-server-start.sh config/server.properties &
集群环境下依次启动即可
(2) 验证
在kafka服务器上创建topic
cd /elk/kafka_2.11-2.2.2
bin/kafka-topics.sh --create --zookeeper 172.16.80.159:2181 --replication-factor 1 --partitions 1 --topic testtopic
查看topic:
bin/kafka-topics.sh --zookeeper 172.16.80.159:2181 --list
监控kafka manager
elk中主要收集日志,不需要kafka manager,需要的话可以自行官网下载
logstash安装部署
Logstash运行同样依赖jdk,需要在服务器上安装jdk版本。
(1) 安装
tar -zxvf #{安装包位置} -C #{指定解压到哪个位置}
(2) 配置
创建目录,我将所有的input、filter、output配置文件全部放到该目录中。
mkdir -pv /elk/logstash-6.6.1/etc/conf.d
vim /elk/logstash-6.6.1/etc/conf.d/input.conf
input {
# 从kafka获取日志数据到logstash
kafka {
bootstrap_servers => "172.16.4.35:9092"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["emp_log_topic"]
codec => "json"
}
}
vim /elk/logstash-6.6.1/etc/conf.d/output.conf
output {
# 输出到elasticsearch中
elasticsearch {
index => "emp-log-%{+YYYY-MM-dd}"
hosts => ["172.16.4.35:9200"]
}
# 输出到控制台,在测试的时候可以配置
stdout {
codec => rubydebug
}
}
vim /elk/logstash-6.6.1/etc/conf.d/filter.conf
filter {
# 过滤日志中不包含指定信息:【lph123578】的日志输出到es中,看需求配置
if ([message] =~ "^(?!.*?lph123578).*$") {
drop {}
}
# 通过grok 正则的方式进行日志内容的提取并赋值到指定的字段上,如下面的messageDate、appName等
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:messageDate} \[(?<appName>[^\[\]]+)\] \[(?<thread>[^\[\]]+)\] \[(?<tradeId>[^\[\]]+)\] \[(?<keyword1>[^\[\]]*)\] %{LOGLEVEL:level}" }
}
}
具体grok配置可以参考下面网页
https://www.jianshu.com/p/443f1ea7b640
https://www.jianshu.com/p/5df5055070b2?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation
(3) 启动
cd /elk/logstash-6.6.1
nohup bin/logstash -f etc/conf.d/ --config.reload.automatic &
(4) 报错信息
启动logstash有如下错误时可以按照下面的方式尝试解决,没有报错既可以跳过。
解决方案
Filebeat安装部署
为什么用Filebeat,而不用原来的Logstash呢?原因很简单,每个服务上按照一个logstash对资源消耗比较大。
解压
tar -zxvf #{安装包位置} -C #{指定解压到哪个位置}
cd /elk/filebeat-6.6.1-linux-x86_64
修改配置
修改filebeat配置,支持收集本地目录日志,并输出日志到kafka中。
cd /elk/filebeat
// 之前的filebeat.yml可以备份下
mv filebeat.yml filebeat.yml.bak
vim filebeat.yml
filebeat.prospectors:
- input_type: log
paths:
- /usr/local/easymainance/easymaintenance-log-manager-1.0-SNAPSHOT/D:/log/emp-log-manager/config-service.log
# 解析的是text文本内容,不规则的
multiline:
# 指的是每行以时间格式开始的字符串
pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'
negate: true
match: after
max_lines: 2000
timeout: 2s
# 解析的是json格式的内容
#json.keys_under_root: true
# json.add_error_key: true
output.kafka:
enabled: true
hosts: ["172.16.4.35:9092"]
topic: 'emp_log_topic'
Filebeat6.0之后一些配置参数变动比较大,比如document_type就不支持,需要用fields来代替等等。
配置项声明
参数 | 描述 |
---|---|
json.keys_under_root | 可以让字段位于根节点,默认为false |
json.overwrite_keys | 对于同名的key,覆盖原有key值 |
json.message_key | - |
json.add_error_key | 将错误消息记录存储在error.message字段中 |
检查配置是否正确
cd /elk/filebeat
./filebeat -c filebeat.yml -configtest
启动
cd /elk/filebeat
nohup ./filebeat -e -c filebeat.yml &
日志收集
日志格式
%date{yyyy-MM-dd HH:mm:ss.SSS} [%X{appname:-}] [%thread] [%X{traceId:-}] [%X{keyword1:-}] %-5level %logger{50}:%L - %msg%n
格式说明
- %date{yyyy-MM-dd HH:mm:ss.SSS}:日志的时间2020-11-16 17:14:58.661
- %X{appname:-} : 应用服务名称
- %thread: 输出产生该日志事件的线程名
- %X{traceId:-} : 自定义流程追踪Id
- %X{keyword1:-}: 关键字1
- %-5level: 输出日志级别,-5表示左对齐并且固定输出5个字符如果不足在右边补空格,类型有DEBUG,INFO,WARN,ERROR
- %logger{50}: 为输出日志的class命名 限定长度50
- %msg%n:输出的日志内容
日志格式输出内容展示
2020-11-16 17:14:58.661 [emp-system-manager] [main] [tradeId] [keyword1] INFO com.hzsun.emp.config.ConfigApplication:61 - Started ConfigApplication in 8.757 seconds (JVM running for 9.172)
日志埋点
单服务\单节点日志埋点
package com.hzsun.emp.aop;
import com.hzsun.emp.constant.FileLogContants;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.UUID;
// 处理的单服务日志埋点
@Aspect
@Component
public class FileLogAspect {
public static final String exepress = "execution(public * com.hzsun.emp.*.controllers.*.*(..))";
@Value("${spring.application.name}")
private String applicationName;
@Pointcut(exepress)
public void setUpBuriedPoint() {
}
@Around("setUpBuriedPoint()")
public Object markFileLog(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
System.err.println("服务名称:"+ applicationName);
// 日志埋点
insertMDC();
Object result = proceedingJoinPoint.proceed();
MDC.clear();
return result;
}
private void insertMDC() {
String traceId = UUID.randomUUID().toString().replace("-", "");
MDC.put(FileLogContants.TRADE_ID, traceId + FileLogContants.FILTER_FIELD);
MDC.put(FileLogContants.SERVICE_NAME, applicationName);
MDC.put(FileLogContants.APP_NAME, FileLogContants.EASY_MAINTENANCE);
}
}
分布式服务\多服务调用日志埋点
LogTrackInterceptorConfig.class
LogTrackInterceptor.class
EmpFeignConfig.class
LogService.class
package com.hzsun.emp.jwt;
import interceptor.LogTrackInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* 添加拦截器
* @author lph
*/
@Configuration
public class LogTrackInterceptorConfig implements WebMvcConfigurer {
@Bean
public LogTrackInterceptor getLogTrackInterceptor() {
return new LogTrackInterceptor();
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
/**
* 根据具体情况设置
* LoginInterceptor是自定义的拦截器
* addPathPatterns参数/**是通配符表示拦截所有的请求
* excludePathPatterns方法的参数是可变参数,可以输n个字符串类型的参数,用来添加白名单
*/
String[] excludePath = {
"/doc.html",
"/swagger-resources/**",
"/webjars/**",
"/v2/**",
"/swagger-ui.html/**"
};
registry.addInterceptor(getLogTrackInterceptor()).addPathPatterns("/**").excludePathPatterns(excludePath);
}
}
package interceptor;
import com.hzsun.emp.constant.FileLogContants;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Optional;
import java.util.UUID;
/**
* 日志追踪拦截器,feign服务间调用确保traceId一致
*
* @version V1.0
* @author: lph
*/
@Component
public class LogTrackInterceptor implements HandlerInterceptor {
private static final ThreadLocal<String> TRACE_ID_THREAD_LOCAL = new ThreadLocal<>();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 请求头中不存在则生成
String traceId = Optional.ofNullable(request.getHeader(FileLogContants.TRACE_ID)).orElse(UUID.randomUUID().toString().replaceAll("-", ""));
TRACE_ID_THREAD_LOCAL.set(traceId);
MDC.put(FileLogContants.TRACE_ID, getTraceId());
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 完成后移除,防止内存占用过大
TRACE_ID_THREAD_LOCAL.remove();
MDC.clear();
}
public static String getTraceId() {
return TRACE_ID_THREAD_LOCAL.get();
}
public static void setTraceId(String traceId) {
TRACE_ID_THREAD_LOCAL.set(traceId);
}
}
package com.hzsun.emp.configuration;
import com.hzsun.emp.constant.FileLogContants;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import interceptor.LogTrackInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.util.Optional;
import java.util.UUID;
/**
* Feign调用时添加追踪id到请求头
*
* @author :lph
* @version: 1.0
*/
@Configuration
public class EmpFeignConfig implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
// 非空校验的目的是防止在启动类中通过feign调用第三方接口时抛出空指针异常
if (attributes != null) {
// 将traceId传递到下个服务
String traceId = Optional.ofNullable(LogTrackInterceptor.getTraceId()).orElse(UUID.randomUUID().toString().replaceAll("-",""));
requestTemplate.header(FileLogContants.TRACE_ID, traceId);
}
}
}
package com.hzsun.emp.serviceapi;
import com.hzsun.emp.configuration.EmpFeignConfig;
import com.hzsun.emp.pojo.po.ScOperationLogsPO;
import com.hzsun.emp.response.HZSunResponse;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
/**
* feign调用时在注解中配置
* value: 指的是被调用方项目在nacos唯一标识 configuration:指的是feign请求的拦截器
* @FeignClient(value = "emp-log-manager", configuration = EmpFeignConfig.class)
* @author :lph
* @version: 1.0
*/
@FeignClient(value = "emp-log-manager", configuration = EmpFeignConfig.class)
public interface LogService {
/**
* 获取数据并插入消息记录表
*
* @param logsPo 封装请求参数对象
*/
@PostMapping("/logmanager/log/insert")
HZSunResponse insertLog(ScOperationLogsPO logsPo);
}
grok提取日志内容
在vim /elk/logstash-6.6.1/etc/conf.d/filter.conf配置
方式一:
处理的日志格式:
%date{yyyy-MM-dd HH:mm:ss.SSS} [%X{appName:-${app_name}}] [%X{serviceName:-${processName}}] [%thread|%X{traceId:-}] [%X{key1:-}] [%X{key2:-}] %-5level %logger{50}:%L - %msg%n
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:messageDate} \[(?<appName>[^\[\]]+)\] \[(?<thread>[^\[\]]+)\] \[(?<tradeId>[^\[\]]+)\] \[(?<keyword1>[^\[\]]*)\] %{LOGLEVEL:level}" }
}
方式二:
处理的日志格式:
%date{yyyy-MM-dd HH:mm:ss.SSS} [%X{appName:-${app_name}}|%X{serviceName:-${processName}}|%thread|%X{traceId:-}|%X{key1:-}|%X{key2:-}] %-5level %logger{50}:%L - %msg%n
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:messageDate} \[(?<appName>[^\[\]]*)\|(?<serviceName>[^\[\]]*)\|(?<thread>[^\[\]]*)\|(?<traceId>[^\[\]]*)\|(?<key1>[^\[\]]*)\|(?<key2>[^\[\]]*)\] %{LOGLEVEL:level}"}
}
# 可以选择性的使用remove_field去掉原始内容message这一行的信息。
mutate{
remove_field => ["message","timestamp"]
}
springboot整合elasticsearch
导入依赖
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/transport-netty4-client-->
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
<version>6.6.1</version>
</dependency>
常量类
public class FileLogContants {
public static final String APP_NAME = "appName";
public static final String SERVICE_NAME = "serviceName";
public static final String TRADE_ID = "tradeId";
public static final String KEY_WORD_1 = "keyword1";
public static final String EASY_MAINTENANCE = "易维";
public static final String FILTER_FIELD = "lph123578";
public static final String MESSAGEDATE = "messageDate";
// 通配符
public static final String STARS_SYMBOL = "*";
// 排序
public static final String TIMESTAMP = "@timestamp";
// total用于es记录总数的查询
public static final Integer TOTAL = 0;
// 关键字标识,es字符串凡是标识为关键字则不会进行分词处理
public static final String ES_KEYWORD_SYMBOL = ".keyword";
}
elasticsearch配置类
package com.hzsun.emp.log.config;
import com.hzsun.emp.utils.StringUtil;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.net.UnknownHostException;
@Configuration
public class ElasticSearchConfig {
public static final Logger logger = LoggerFactory.getLogger(ElasticSearchConfig.class);
@Value("${elasticsearch.host}")
private String host;
@Value("${elasticsearch.port}")
private Integer port;
@Value("${elasticsearch.cluster-name}")
private String clusterName;
private TransportClient transportClient;
@Bean
public TransportClient transportClient(){
Settings settings = Settings.EMPTY;
if(!StringUtil.isEmpty(clusterName)){
settings = Settings.builder()
.put("cluster.name", clusterName)
.build();
}
try {
transportClient = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
// 集群环境可以尝试用addTransportAddresses()方法
} catch (UnknownHostException e) {
logger.error("创建elasticsearch客户端失败");
e.printStackTrace();
}
logger.info("创建elasticsearch客户端成功");
return transportClient;
}
@Bean
public BulkProcessor bulkProcessor() throws UnknownHostException {
Settings settings = Settings.EMPTY;
if(!StringUtil.isEmpty(clusterName)){
settings = Settings.builder()
.put("cluster.name", clusterName)
.build();
}
TransportClient transportClient = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
return BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
logger.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable);
}
}).setBulkActions(1000)//分批,每10000条请求当成一批请求。默认值为1000
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//每次5MB,刷新一次bulk。默认为5m
.setFlushInterval(TimeValue.timeValueSeconds(5))//每5秒一定执行,不管已经队列积累了多少。默认不设置这个值
.setConcurrentRequests(1)//设置并发请求数,如果是0,那表示只有一个请求就可以被执行,如果为1,则可以积累并被执行。默认为1.
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//这里有个backoff策略,最初等待100ms,然后按照指数增长并重试3次。每当一个或者多个bulk请求失败,并出现EsRejectedExecutionException异常时.就会尝试重试。这个异常表示用于处理请求的可用计算资源太少。如果要禁用这个backoff策略,需要用backoff.nobackoff()。
.build();
}
@PostConstruct
void init() {
System.setProperty("es.set.netty.runtime.available.processors", "false");
}
}
elasticsearch检索
package com.hzsun.emp.log.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hzsun.emp.constant.FileLogContants;
import com.hzsun.emp.log.bean.FileLogDTO;
import com.hzsun.emp.log.bean.FileLogQO;
import com.hzsun.emp.log.service.IFileLogService;
import com.hzsun.emp.utils.DateUtil;
import com.hzsun.emp.utils.ObjectMapperUtil;
import com.hzsun.emp.utils.StringUtil;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Service
public class FileLogServiceImpl implements IFileLogService {
public static final Logger logger = LoggerFactory.getLogger(FileLogServiceImpl.class);
@Autowired
private TransportClient transportClient;
@Value("${elasticsearch.index}")
private String elasticIndex;
@Value("${elasticsearch.type}")
private String elasticType;
@Override
public JSONObject selectLogFromElastic(FileLogQO logQo) {
logger.info("进行日志埋点");
// 进行日志埋点操作
MDC.put(FileLogContants.KEY_WORD_1, "关键字");
// 根据传入的参数进行查询条件的拼接
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
if (!StringUtil.isEmpty(logQo.getAppName())) {
// termQuery中参数加上FileLogContants.ES_KEYWORD_SYMBOL是防止es分词器进行分词影响查询
queryBuilder.must(QueryBuilders.termQuery(FileLogContants.APP_NAME + FileLogContants.ES_KEYWORD_SYMBOL, logQo.getAppName()));
}
if (!StringUtil.isEmpty(logQo.getServiceName())) {
queryBuilder.must(QueryBuilders.termQuery(FileLogContants.SERVICE_NAME + FileLogContants.ES_KEYWORD_SYMBOL, logQo.getServiceName()));
}
if (!StringUtil.isEmpty(logQo.getKeyword1())) {
queryBuilder.must(QueryBuilders.wildcardQuery(FileLogContants.KEY_WORD_1, FileLogContants.STARS_SYMBOL + logQo.getKeyword1() + FileLogContants.STARS_SYMBOL));
}
if (!StringUtil.isEmpty(logQo.getTradeId())) {
queryBuilder.must(QueryBuilders.wildcardQuery(FileLogContants.TRADE_ID, FileLogContants.STARS_SYMBOL + logQo.getTradeId() + FileLogContants.STARS_SYMBOL));
}
if (!StringUtil.isEmpty(logQo.getBeginTime()) && !StringUtil.isEmpty(logQo.getEndTime())) {
// rangeQuery中参数加上FileLogContants.ES_KEYWORD_SYMBOL是防止es分词器进行分词影响时间范围查询
queryBuilder.must(QueryBuilders.rangeQuery(FileLogContants.MESSAGEDATE + FileLogContants.ES_KEYWORD_SYMBOL).from(logQo.getBeginTime()).to(logQo.getEndTime()));
}
int offset = logQo.getOffset();
SearchRequestBuilder builder = transportClient.prepareSearch(elasticIndex).setTypes(elasticType)
.setQuery(queryBuilder)
.setFrom(offset)
.setSize(logQo.getPageSize())
// 根据指定字段进行排序
.addSort(FileLogContants.TIMESTAMP, SortOrder.DESC)
//explain为true表示根据数据相关度排序,和关键字匹配最高的排在前面
.setExplain(true);
List<FileLogDTO> result = new ArrayList<>();
SearchResponse searchResponse = builder.get();
for (SearchHit hit : searchResponse.getHits()) {
String messageJson = ObjectMapperUtil.writeValueAsString(hit.getSourceAsMap());
FileLogDTO fileLogDTO = ObjectMapperUtil.readValue(messageJson, FileLogDTO.class);
assert fileLogDTO != null;
fileLogDTO.setId(hit.getId());
result.add(fileLogDTO);
}
JSONObject object = new JSONObject();
object.put("file_log_list", result);
return object;
}
@Override
public long getTotalSize() {
// 查询总条数
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
SearchHits hits = transportClient.prepareSearch(elasticIndex).setTypes(elasticType)
.setQuery(boolQueryBuilder)
// 设置为0的时候即查询所有数据总数
.setSize(FileLogContants.TOTAL)
.get().getHits();
return hits.getTotalHits();
}
}
备注
- 日志系统密码设置
elk访问密码设置-参考