同步业务MySQL数据到实时仓库ClickHouse的数据流转过程如下:
上述在大数据MySQL中转数据的目的,是为了精准同步某张表的数据到大数据平台,避免资源的浪费,否则ClickHouse通过MaterializeMySQL引擎直接创建业务库对应的数据库,会将整个业务库的表数据都拉取到大数据平台,不论该库的表是否有使用到。
以下通过业务表test.test同步的配置举例说明。
1 CanalServer配置源库表链接参数
#在CanalServer的Conf目录下复制example文件夹并命名为新的文件夹test,修改其中文件instance.properties,修改以下参数即可,包括链接源库地址,端口,用户名和密码,以及同步的库表名,保存:
canal.instance.master.address=业务库ip:3306
canal.instance.dbUsername=username
canal.instance.dbPassword=password
canal.instance.filter.regex=test\\.test
#同时,修改canal.properties文件,修改以下参数即可,增加新配置的test,用逗号分隔:
canal.destinations = example,test
启动CanalServer,sh bin/startup.sh,可通过查看日志判断是否正常启动。
当新增相同MySQL下库表的同步时,调整过滤规则即可,比如如下设置,表示同步test库下的test和test2两张表。
canal.instance.filter.regex=test\\.test,test\\.test2
当新增不同Mysql下库表的同步时,同样复制example文件夹进行配置即可。
2 CanalAdapter配置目标库表及映射
#进入CanalAdaper的Conf目录下,先配置application.yml文件。
#application.yml文件修改以下参数即可,包括tomcat的port值,canalServer访问地址和端口号,源库表连接方式,目标库表连接方式, canalServer中的实例名称(destination名称),key值,name值指定为rdb(表示同步到关系型数据库):
server:
port: 10005 #默认为8081,如果发生冲突可修改。
canal.conf:
mode: tcp # kafka rocketMQ
canalServerHost: CanalServerIP:11111
srcDataSources:
defaultDS:
url: jdbc:mysql://业务库ip:3306/test?useUnicode=true
username: username
password: password
canalAdapters:
- instance: test # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://大数据MySQL:3306/ods?useUnicode=true
jdbc.username: username
jdbc.password: password
# 进入conf目录下的rdb目录,修改mytest_user.yml文件为test.yml,并配置参数如下:
dataSourceKey: defaultDS
destination: test
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: test
table: test
targetTable: ods.test
targetPk:
id: id
mapAll: true
启动CanalAdapter,sh startup.sh,查看日志确定运行状态。
当新增相同MySQL下库表的同步时,在rdb文件夹下增加对应的yml文件即可。
当新增不同Mysql下库表的同步时,需要增加适配器,即按照上述配置CanalAdapter的方式再配置一套适配器。
3 大数据MySQL设置
在启动CanalAdapter之前,目标库和表必须是存在的。规定需同步到大数据的业务表全部入库到ods库中,表名以 原库名_原表名的方式命名,比如源库test的test表同步到ods库且命名为test,并创建和源表相同的表结构。
为实现ClickHouse通过MaterializeMySQL引擎的方式实时同步MySQL数据,要求Mysql的相关参数配置如下,且Mysql版本不得低于5.6:
gtid-mode=on
enforce-gtid-consistency=1 # 设置为主从强一致性
log-slave-updates=1 # 记录日志
default_authentication_plugin=mysql_native_password
binlog-format=ROW
注:这里没有采用Canal的库间镜像方式(实现了DDL语句的同步)的方式自动创建表,是为了避免锁表或其他DDL语句时序错误导致意外故障发生。官网不建议线上生产环境开启DDL语句的同步。
4 ClickHouse实时同步MySQL数据
如下,在ClickHouse中使用MaterializeMySQL引擎方式创建ods库,实时同步MySQL ods库的表数据。
create database ods ENGINE = MaterializeMySQL('大数据MySQLIP:3306', 'ods', 'username', 'password')
5 全量数据导入
上述仅说明了增量同步的方式,而Canal实现全量同步数据,需要通过手动调用接口实现,命令如下:
curl http://127.0.0.1:10005/etl/rdb/mysql1/test.yml -X POST
#10005表示Adapter配置的application.yml中配置的tomcat的port值
#mysql1表示Adapter配置的application.yml中配置的outerAdapters中的参数key值
#test.yml表示要同步的表对应的yml文件,在Adatper home的conf/rdb目录下
上述全量同步过程中,如果数据量比较大,需要耐心等待一段时间。
注意事项:
1)MaterializeMySQL引擎创建的表不允许使用如下方式将数据导入:
insert into test select * from mysql('IP:3306', 'ods', 'test','username', 'password')
2)案例中所用MySQL版本为5.7.18, Canal版本为1.4.1,ClickHouse版本为20.10.2.20。