前言
上一篇 基于docker部署的微服务架构(七): 部署ELK日志统计分析系统 中,已经把日志数据输出到 elasticsearch 并用 kibana 做展现。
实际项目中还有把日志数据保存到数据库,做进一步分析的需求。由于数据分析的需求有可能变动(例如:可能一开始只需要统计服务使用总量,后来随着业务的扩展,需要按不同地区或不同服务进行统计),保存日志的数据结构也会相应的变动,如果使用传统的关系型数据库,就需要对表结构进行修改,一般日志的数据量会很大,修改表结构很耗时,对开发也不友好,这时候应该考虑 nosql 数据库。
从 nosql 的使用情况来看,MongoDB 是使用最多的,一方面由于良好的性能,尤其在收购了 wiredtiger 引擎之后,提供了文档级锁,写入性能大大提高;另一方面就是 MongoDB 比较容易上手,hbase 依赖 hadoop 环境,部署起来比较麻烦,MongoDB 就很简单了。
本文将会介绍,在 docker 环境下搭建 MongoDB,搭建 MongoDB 的web客户端 mongo-express,MongoDB 简单使用,使用 spring kafka 创建 kafka 消费者接受消息,以及使用 spring data 操作 MongoDB。
MongoDB的优缺点
先说优点,nosql 数据库的优点, MongoDB 都具备:高扩展性、高性能、松散的数据结构、天然支持分片和集群等等。
MongoDB 在此之上还提供了非常丰富的查询功能,不像 hbase 只能全表或 row key 查询。MongoDB 还提供了二级索引,并且还支持 MapReduce。MongoDB 还提供了一个分布式文件系统 GridFS,用来存储超过16MB的数据。
缺点,不支持事务,这是 nosql 数据库的通病,也是 nosql 的基因所致。nosql 从诞生就 不是为了 处理结构化的强一致性数据的。
像日志数据这种,结构松散、不要求一致性、数据量大的数据,保存到 MongoDB 是个不错的选择。
在docker环境中部署MongoDB
登录 docker 节点,运行 docker pull mongo:3.2.11
下载目前最新的 MongoDB 镜像(建议在拉取镜像时使用具体的版本号,不要用 latest,避免版本兼容的问题,也更清楚具体用的哪个版本)。
创建数据挂载卷,mkdir -p /mongodb/data
。
启动容器
docker run -d --name mongodb --volume /mongodb/data:/data/db \
--publish 27017:27017 \
mongo:3.2.11
运行 docker exec -it mongodb mongo
进入 Mongo shell,运行 show dbs
查看当前所有的数据库。
简单介绍下 MongoDB 的基础概念:
- database 和 mysql 类似,表示数据库
- collection 相当于 mysql 中的表,用来存放数据,不同的是 collection 不需要定义表结构
- document 相当于 mysql 表中保存的一条数据,BSON格式,BSON类似于JSON
在 Mongo shell 中运行 use add-service-demo-log
,创建一个 add-service-demo-log 数据库,并切换到了这个数据库。
这时候运行 show dbs
并没有显示 add-service-demo-log 数据库,因为新建的这个数据库中没有 collection,新建一个 collection,db.createCollection('addLog')
。
再运行 show dbs
就可以看到 add-service-demo-log 数据库了。
运行 db
,查看当前处于哪个数据库。
运行 show collections
查看数据库中所有的 collection。
在docker环境中部署mongo-express
在使用 MongoDB 开发时,通常需要一个客户端工具帮助我们操作 MongoDB,有很多优秀的客户端工具可以选择。这里我们部署一个web端的客户端工具 mongo-express,web工具的好处就是只要部署在服务端,所有开发人员都可以使用。
运行 docker pull mongo-express:0.32.0
,下载镜像。
启动容器
docker run -d --name mongo-express --link mongodb:mongo \
--publish 8081:8081 \
mongo-express:0.32.0
在 --link
的时候给 mongodb 一个别名:mongo,因为 mongo-express 默认的MongoDB server是 mongo,这样的话就不用指定 ME_CONFIG_MONGODB_SERVER 环境变量了。
启动完成之后访问 http://宿主机IP:8081,打开 mongo-express 的页面。
修改add-service-demo把日志发送到kafka的add-log topic
修改 log4j2.xml 配置文件,新增一个 kafka appender ,日志的输出格式使用 JsonLayout
:
<Kafka name="addLog" topic="add-log">
<JsonLayout complete="false" compact="true"/>
<Property name="bootstrap.servers">${kafkaBootstrapServers}</Property>
</Kafka>
把日志数据输出到 kafka 的 add-log topic 下。对这个 appender 进行异步包装:
<Async name="addLogAsync">
<AppenderRef ref="addLog"/>
</Async>
最后增加一个 Logger,使用之前配置的异步 appender:
<Logger name="addLogger" level="info">
<appender-ref ref="addLogAsync"/>
</Logger>
修改完之后 log4j2.xml 的文件内容:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Properties>
<Property name="logFormat">
%d{yyyy-MM-dd HH:mm:ss.SSS}{GMT+8} [@project.artifactId@] [%thread] %-5level %logger{35} - %msg %n
</Property>
<Property name="kafkaBootstrapServers">
@kafka.bootstrap.servers@
</Property>
</Properties>
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="${logFormat}"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/@project.artifactId@.log"
filePattern="logs/@project.artifactId@.%d{yyyy-MM-dd}.log">
<PatternLayout>
<Pattern>
${logFormat}
</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy/>
</Policies>
</RollingFile>
<Kafka name="basicLog" topic="basic-log">
<PatternLayout>
<Pattern>
${logFormat}
</Pattern>
</PatternLayout>
<Property name="bootstrap.servers">${kafkaBootstrapServers}</Property>
</Kafka>
<Kafka name="addLog" topic="add-log">
<JsonLayout complete="false" compact="true"/>
<Property name="bootstrap.servers">${kafkaBootstrapServers}</Property>
</Kafka>
<Async name="FileAsync">
<AppenderRef ref="RollingFile"/>
</Async>
<Async name="basicLogAsync">
<AppenderRef ref="basicLog"/>
</Async>
<Async name="addLogAsync">
<AppenderRef ref="addLog"/>
</Async>
</Appenders>
<Loggers>
<Logger name="org.apache.kafka" level="info"/>
<Logger name="addLogger" level="info">
<appender-ref ref="addLogAsync"/>
</Logger>
<Root level="info">
<AppenderRef ref="STDOUT"/>
<AppenderRef ref="FileAsync"/>
<AppenderRef ref="basicLogAsync"/>
</Root>
</Loggers>
</Configuration>
在 AddController.java 中使用 addLogger 输出日志:
@RestController
@RefreshScope
public class AddController {
private static final Logger addLogger = LoggerFactory.getLogger("addLogger");
@Value("${my.info.str}")
private String infoStr;
@RequestMapping(value = "/add", method = RequestMethod.GET)
public Map<String, Object> add(Integer a, Integer b) {
System.out.println("端口为8100的实例被调用");
System.out.println("infoStr : " + infoStr);
Map<String, Object> returnMap = new HashMap<>();
returnMap.put("code", 200);
returnMap.put("msg", "操作成功");
Integer result = a + b;
returnMap.put("result", result);
addLogger.info("a : " + a + ", b : " + b + ", a + b :" + result);
return returnMap;
}
}
这样在调用 AddController 中的 add 方法时,会输出一条日志到 kafka 的 add-log topic 中,只要创建一个消费者订阅 add-log,把日志数据保存到 MongoDB 即可。
创建日志消费者
新建一个 maven 项目,修改 pom.xml 增加需要的依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<properties>
<!-- 指定java版本 -->
<java.version>1.8</java.version>
<!-- 镜像前缀,推送镜像到远程库时需要,这里配置了一个阿里云的私有库 -->
<docker.image.prefix>
registry.cn-hangzhou.aliyuncs.com/ztecs
</docker.image.prefix>
<!-- docker镜像的tag -->
<docker.tag>demo</docker.tag>
<!-- 激活的profile -->
<activatedProperties></activatedProperties>
<kafka.bootstrap.servers>10.47.160.238:9092</kafka.bootstrap.servers>
</properties>
<profiles>
<!-- docker环境 -->
<profile>
<id>docker</id>
<properties>
<activatedProperties>docker</activatedProperties>
<docker.tag>docker-demo-${project.version}</docker.tag>
<kafka.bootstrap.servers>kafka:9092</kafka.bootstrap.servers>
</properties>
</profile>
</profiles>
<build>
<defaultGoal>install</defaultGoal>
<finalName>${project.artifactId}</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<!-- 配置spring boot maven插件,把项目打包成可运行的jar包 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<executable>true</executable>
</configuration>
</plugin>
<!-- 打包时跳过单元测试 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<!-- 配置docker maven插件,绑定install生命周期,在运行maven install时生成docker镜像 -->
<plugin>
<groupId>com.spotify</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.4.13</version>
<executions>
<execution>
<phase>install</phase>
<goals>
<goal>build</goal>
<goal>tag</goal>
</goals>
</execution>
</executions>
<configuration>
<!-- 修改这里的docker节点ip,需要打开docker节点的远程管理端口2375,
具体如何配置可以参照之前的docker安装和配置的文章 -->
<dockerHost>http://docker节点ip:2375</dockerHost>
<imageName>${docker.image.prefix}/${project.build.finalName}</imageName>
<baseImage>java</baseImage>
<!-- 这里的entryPoint定义了容器启动时的运行命令,容器启动时运行
java -jar 包名 , -Djava.security.egd这个配置解决tomcat8启动时因为需要收集环境噪声来生成安全随机数导致启动过慢的问题-->
<entryPoint>["java", "-Djava.security.egd=file:/dev/./urandom", "-jar", "/${project.build.finalName}.jar"]
</entryPoint>
<resources>
<resource>
<targetPath>/</targetPath>
<directory>${project.build.directory}</directory>
<include>${project.build.finalName}.jar</include>
</resource>
</resources>
<image>${docker.image.prefix}/${project.build.finalName}</image>
<newName>${docker.image.prefix}/${project.build.finalName}:${docker.tag}</newName>
<forceTags>true</forceTags>
<pushImage>false</pushImage>
</configuration>
</plugin>
</plugins>
</build>
看过之前文章的朋友应该对这些配置比较熟悉了,和其他项目不同的是,这里引入了 spring-kafka 、jackson、 spring-boot-starter-data-mongodb 。
使用 spring-kafka 创建 kafka 订阅者,使用 jackson 对日志数据进行转换,使用 spring-boot-starter-data-mongodb 完成 MongoDB 的相关操作。
在 resources 目录下创建 bootstrap.yml
,因为配置信息是从 config-server 中获取的,所以 bootstrap.yml
的内容和其他项目一样:
spring:
application:
name: @project.artifactId@
profiles:
active: @activatedProperties@
cloud:
config:
profile: dev
label: master
discovery:
enabled: true
serviceId: CONFIG-SERVER-DEMO
failFast: true
retry:
initialInterval: 10000
multiplier: 2
maxInterval: 60000
maxAttempts: 10
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8000/eureka/
在git仓库中创建配置文件 log-persist-demo-dev.yml ,:
spring:
rabbitmq:
host: 10.47.160.238
port: 5673
username: guest
password: guest
data:
mongodb:
uri: mongodb://10.47.160.114:27017/add-service-demo-log
kafka:
bootstrapServers: 10.47.160.114:9092
groupId: mongo
enableAutoCommit: true
autoCommitIntervalMs: 100
sessionTimeOutMs: 15000
创建 log4j2.xml 配置文件,内容也和其他项目相同:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Properties>
<Property name="logFormat">
%d{yyyy-MM-dd HH:mm:ss.SSS}{GMT+8} [@project.artifactId@] [%thread] %-5level %logger{35} - %msg %n
</Property>
<Property name="kafkaBootstrapServers">
@kafka.bootstrap.servers@
</Property>
</Properties>
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="${logFormat}"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/@project.artifactId@.log"
filePattern="logs/@project.artifactId@.%d{yyyy-MM-dd}.log">
<PatternLayout>
<Pattern>
${logFormat}
</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy/>
</Policies>
</RollingFile>
<Kafka name="basicLog" topic="basic-log">
<PatternLayout>
<Pattern>
${logFormat}
</Pattern>
</PatternLayout>
<Property name="bootstrap.servers">${kafkaBootstrapServers}</Property>
</Kafka>
<Async name="FileAsync">
<AppenderRef ref="RollingFile"/>
</Async>
<Async name="basicLogAsync">
<AppenderRef ref="basicLog"/>
</Async>
</Appenders>
<Loggers>
<Logger name="org.apache.kafka" level="info"/>
<Root level="info">
<AppenderRef ref="STDOUT"/>
<AppenderRef ref="FileAsync"/>
<AppenderRef ref="basicLogAsync"/>
</Root>
</Loggers>
</Configuration>
创建一个 demo
包,在 demo
包下创建启动入口 LogPersistDemoApplication.java
:
@SpringBootApplication
public class LogPersistDemoApplication {
public static void main(String[] args) {
SpringApplication.run(LogPersistDemoApplication.class, args);
}
}
在 demo
下创建一个子包 config
,用来存放 java config 配置。创建 KafkaConfig.java 配置 kafka :
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrapServers}")
private String bootstrapServers;
@Value("${kafka.groupId}")
private String groupId;
@Value("${kafka.enableAutoCommit}")
private Boolean enableAutoCommit;
@Value("${kafka.autoCommitIntervalMs}")
private Integer autoCommitIntervalMs;
@Value("${kafka.sessionTimeOutMs}")
private Integer sessionTimeOutMs;
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeOutMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
简单看下 KafkaConfig.java ,在类级别使用 @EnableKafka
让 spring boot 自动帮我们初始化 kafka 相关的bean。配置类中用到的配置信息使用 @Value
从 log-persist-demo-dev.yml 读取数据,并配置了 kafkaListenerContainerFactory
,用来创建 kafka 消费者。
在 config
包下创建 MongoConfig.java
配置 MongoDB :
@Configuration
public class MongoConfig {
@Value("${spring.data.mongodb.uri}")
private String mongoClientUri;
@Bean
public MongoDbFactory mongoDbFactory() {
SimpleMongoDbFactory mongoDbFactory = null;
try {
mongoDbFactory = new SimpleMongoDbFactory(new MongoClientURI(mongoClientUri));
mongoDbFactory.setWriteConcern(WriteConcern.UNACKNOWLEDGED);
} catch (UnknownHostException e) {
e.printStackTrace();
}
return mongoDbFactory;
}
}
这里配置了 MongoClientURI
,并且设置了写安全级别(WriteConcern)为 UNACKNOWLEDGED,关于 MongoDB 写安全级别,简单介绍:
- Errors Ignored(-1) 忽略所有异常,包括网络异常。
- Unacknowledged(0) 忽略写入异常,但是会检测网络异常。
- Acknowledged(1) 默认级别,可以捕获到写入异常。 MongoDB 保存数据时,先把数据写入内存,定期 fsync 保存到硬盘,如果数据写入内存之后没来得及写入硬盘,服务挂了,数据就丢了。
- Journaled(1, journal=true) 增加 journal 日志,数据写入内存的同时记录日志,服务down了可以通过 journal 日志还原操作
- majority(>1) 在副本集模式下,保证多数节点(超过半数)数据写入。
从上到下,安全级别由低到高,写入效率由高到低。由于记录的是日志数据,数据量大,对写入效率要求较高,并且允许部分数据丢失,所以配置了 Unacknowledged 级别,最低级别会忽略网络异常,一般不建议使用。
在 demo
包下创建一个 model
子包,用来存放数据模型。创建 AddLog.java
数据模型:
public class AddLog {
@Id
private String id;
private Long timeMillis;
private String thread;
private String level;
private String loggerName;
private String message;
private Boolean endOfBatch;
private String loggerFqcn;
private Integer threadId;
private Integer threadPriority;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimeMillis() {
return timeMillis;
}
public void setTimeMillis(Long timeMillis) {
this.timeMillis = timeMillis;
}
public String getThread() {
return thread;
}
public void setThread(String thread) {
this.thread = thread;
}
public String getLevel() {
return level;
}
public void setLevel(String level) {
this.level = level;
}
public String getLoggerName() {
return loggerName;
}
public void setLoggerName(String loggerName) {
this.loggerName = loggerName;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Boolean getEndOfBatch() {
return endOfBatch;
}
public void setEndOfBatch(Boolean endOfBatch) {
this.endOfBatch = endOfBatch;
}
public String getLoggerFqcn() {
return loggerFqcn;
}
public void setLoggerFqcn(String loggerFqcn) {
this.loggerFqcn = loggerFqcn;
}
public Integer getThreadId() {
return threadId;
}
public void setThreadId(Integer threadId) {
this.threadId = threadId;
}
public Integer getThreadPriority() {
return threadPriority;
}
public void setThreadPriority(Integer threadPriority) {
this.threadPriority = threadPriority;
}
}
创建数据访问层 AddLogDao.dao
,使用 spring data 对 MongoDB 的封装 :
@Repository
public interface AddLogDao extends MongoRepository<AddLog, String> {
}
创建 KafkaConsumer.java
接受 kafka 消息:
@Component
public class KafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@Autowired
private AddLogDao addLogDao;
@KafkaListener(topics = {"add-log"})
public void receivePersistLog(String data) {
logger.info("接收到需要保存到MongoDB的日志数据, data : " + data);
ObjectMapper objectMapper = new ObjectMapper();
try {
AddLog addLog = objectMapper.readValue(data, AddLog.class);
addLogDao.save(addLog);
logger.info("成功保存日志数据, data : " + data);
} catch (IOException e) {
e.printStackTrace();
}
}
}
这里使用 @KafkaListener(topics = {"add-log"})
接受 add-log topic 的消息,把收到的消息保存到 MongoDB。
运行 LogPersistDemoApplication.java
的 main
方法启动,访问 add-service-demo 提供的 add
接口,会在 MongoDB 中插入一条日志记录。
使用docker-maven-plugin打包并生成docker镜像
这部分内容和前面几篇文章基本相同,都是把容器间的访问地址和 --link
参数对应,不再赘述。
最后
本文简单介绍了 MongoDB 的相关内容, 以及使用 spring kafka 接受kafka消息,并把数据插入 MongoDB。