一:在linux环境下安装部署好mysql
1 :开启binlog
sudo vi /etc/my.cnf (Mysql的配置文件)
2: mysql的binlog格式有3种,为了把binlog解析成json数据格式,要设置binlog的格式为row(binlog有三种格式:Statement、Row以及Mixed)
server-id=1
log-bin=/data/mysql/log/binlog(这一步开启binlog)
binlog_format=row
具体如下图:
3:重启msyql服务
sudo service mysqld restart
4:查看是否已经开启binlog
Mysql>show variables like '%log_bin%';
此时在/data/mysql/log/binlog目录下可以看到生成了相应的binlog监听日志文件,如图,binlog.000004文件,每次重启msyql服务,就会生成一个新的监听文件,这里日志文件名称跟步骤2中配置的log-bin=/data/mysql/log/binlog有关,如果log-bin=master配置这样,那么日志文件名称就是master.000004。
二:配置Maxwell相关的部署工作
1:下载Maxwell
官网:http://maxwells-daemon.io/
组件下载链接
https://github.com/zendesk/maxwell/releases/download/v1.11.0/maxwell-1.11.0.tar.gz
2:上传maxwell-1.11.0.tar.gz
通过winscp等FTP工具把maxwell-1.11.0.tar.gz到/usr/local/maxwell/ 目录下
3:解压maxwell-1.11.0.tar.gz
tar -zxvf maxwell-1.11.0.tar.gz
4:创建maxwell使用的mysql用户并赋权
mysql> GRANT ALL on maxwell.* to'maxwell'@'%' identified by 'XXXXXX';
mysql> GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE on *.* to 'maxwell'@'%';
以上图片为官网参考
以我自己的为例:
GRANT ALL on *.* to 'user01'@'%' identified by 'test123';
把所有数据库的所有表授权给user01用户以密码test123登录
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'user01'@'%';
flush privileges;
(mysql 新设置用户或更改密码后需用flush privileges刷新MySQL的系统权限相关表,否则会出现拒绝访问,还有一种方法,就是重新启动mysql服务器,来使新设置生效。)
4::开启maxwell命令行(注意,如果没有设置,maxwell默认是把监听的mysql的binlog日志发送到kafka的主题叫maxwell的topic上的)
bin/maxwell --user='maxwell' --password='xxxx' --host='数据库IP地址' --producer=kafka --kafka.bootstrap.servers=bi-master:9092
解释:数据库IP地址参数是安装mysql的那台主机,最后的kafka.bootstrap.servers是安装kafka集群的节点主机名(最好不要用IP地址)和端口号。
三:kafka相关配置
说明(以下我的kafka是安装在主机名叫bi-master,注意kafka里的配置文件端口号要和命令行里给的端口号一致)
1:首先启动zookeeper
$sbin/zkServer.sh start
2:开启kafka命令行
bin/kafka-server-start.shconfig/server.properties
3:在kafka中创建一个主题叫maxwell以便于接受数据
bin/kafka-topics.sh--create --zookeeper bi-master:2181 --replication-factor 1 --partitions 1 --topic maxwell
4:启动kafka消息生产者窗口
bin/kafka-console-producer.sh --broker-list bi-master:9092 --topic maxwell
5:启动kafka消息消费者窗口
bin/kafka-console-consumer.sh --zookeeper bi-master:2181 --topic maxwell --from-beginning
四:测试
1:在Mysql客户工具 中添加一个数据
消费窗口接收到的消息:
{"database":"binlogTest","table":"myTest","type":"insert","ts":1515494531,"xid":7693,"commit":true,"data":{"name":"小梅","sex":"女","age":18,"address":"深圳市南山区海岸城"}}
2:修改一条数据:
消费窗口接收到的消息:
{"database":"binlogTest","table":"myTest","type":"update","ts":1515494707,"xid":7756,"commit":true,"data":{"name":"小梅","sex":"男","age":18,"address":"深圳市福田区"},"old":{"sex":"女","address":"深圳市南山区海岸城"}}
3:删除一条数据
消费窗口接收到的消息:
{"database":"binlogTest","table":"myTest","type":"delete","ts":1515494807,"xid":7799,"commit":true,"data":{"name":"小梅","sex":"男","age":18,"address":"深圳市福田区"}}
五:通过java程序去消费kafka消息数据
1:jar包添加
2:代码实现
第一种实现:
第二种实现: