搭建flinkCDC采集mysql到doris环境

搭建flinkCDC采集mysql到doris环境

1. 搭建环境

第1步首先搭建平台包扩flink、flinkCDC、mysql、doris;

1.1 准备环境

  • 实验平台:VMware虚拟机CentOS8;

虚拟机需要最小6G内存+30G存储;关于linux系统存储扩容,可以参考LVM(logic volumn manager)系统

  • Docker: Docker version 24.0.1, build 6802122

  • jkd:orale的Java11;

  • ssh工具:MobaXterm;

准备Java

flink1.8依赖Java11,需要卸载linux centOS8自带的openJDK1.8;

使用yum remove命令卸载即可。yum list|grep jdk 列出Java安装目录,删除即可。

  1. 到oracle官网下载java11的.gz包(jdk-11.0.20_linux-x64_bin.tar.gz),上传解压,配置/etc/profile环境变量,如下追加到文件末尾即可。
JAVA_HOME=/root/java/jdk-11.0.20   # 本次项目Java安装目录,具体安装时需要根据实际情况修改。
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin</pre>

配置好上述文件保存退出执行如下命令,让配置生效

source /etc/profile

在执行查询Java版本

java -version

出现版本号,验证安装jdk成功。

1.2 安装flink:

准备 Flink Standalone 集群:到Fink官网下载最新版flink-1.18.0-bin-scala_2.12.tgz

上传到 Iinux虚拟机,解压之后配置环境变量,同jdk环境变量配置流程

配置好上述文件保存退出执行如下命令,让配置生效

source /etc/profile

进入到flink安装目录,执行./bin/start-cluster.sh启动flink,最后可以从web-ui页面上 查看flink作业情况:

地址:http://192.168.55.133:8081/

如果web页面无法访问,需要检查 flink的配置,如下0.0.0.0代表任意访问地址。

其他需要测试保存点,打开页面作业取消、提交入口等配置,也可以进入flink的配置目录flink-conf.yaml 进行配置。如下:

185 #==============================================================================
186 # Rest & web frontend
187 #==============================================================================
188
189 # The port to which the REST client connects to. If rest.bind-port has
190 # not been specified, then the server will bind to this port as well.
191 #
192 rest.port: 8081
193
194 # The address to which the REST client will connect to
195 #
196 rest.address: 0.0.0.0
197
198 # Port range for the REST and web server to bind to.
199 #
200 #rest.bind-port: 8080-8090
201
202 # The address that the REST & web server binds to
203 # By default, this is localhost, which prevents the REST & web server from
204 # being able to communicate outside of the machine/container it is running on.
205 #
206 # To enable this, set the bind address to one that has access to outside-facing
207 # network interface, such as 0.0.0.0.
208 #
209 rest.bind-address: 0.0.0.0
210
211 # Flag to specify whether job submission is enabled from the web-based
212 # runtime monitor. Uncomment to disable.
213
214 #web.submit.enable: false
215 web.submit.enable: true
216
217 # Flag to specify whether job cancellation is enabled from the web-based
218 # runtime monitor. Uncomment to disable.
219
220 web.cancel.enable: true
1705393613660.png

flink-web-ui访问不到

flink-web Ui访问问题异常

  1. 当我使用 Docker compose部署flink的时候,可以直接通过外部浏览器访问到服务器的 flink-webUI,但是当我使用本地部署,启动flink之后也无法通过外部浏览器访问到 ui界面。
  1. 最后查看防火墙状态systemctl status firewalld发现是由于防火墙没有关闭,之后关闭防火墙再次访问可以正常打开systemctl stop firewalld,或者防火墙开着,开放对应端口。具体操作见下面附件《防火墙操作》章节

2. Docker部署mysql和doris

docker的安装和使用这里省略。。。

这里给出运行mysql和doris的dockerfile

2.1 docker部署mysql5.7


docker run -id -p 3306:3306 \ 
-v /root/mysql/conf:/etc/mysql/mysql.conf.d/ \
-v /root/mysql/data:/var/lib/mysql \
-v /root/mysql/logs:/logs \
-e MYSQL_ROOT_PASSWORD=root \
--name mysql \
mysql:5.7

systemctl start docker 启动docker

启动mysql的binlog

创建mysql配置文件

vi /mydata/mysql/conf/my.cnf下的my.cnf文件

注意 :每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id,称为 Server id。

docker restart mysql #重启mysql docker exec -it mysql bin/bash #docker启动mysql客户端

mysql -u root -proot show variables like '%log_bin%; #查看是否生效

外部客户端navicate连接

正常连接即可,若不能正常连接,先开通防火墙端口3306,再执行如下操作

使用navicate连接报错解决,在服务器连接上mysql执行如下命令:

select user,plugin from user where user='root';

alter user 'root'@'%' identified with mysql_native_password by 'root';

flush privileges; </pre>

use mysql;

update user set host = '%' where user = 'root';

select host, user from user; </pre>

再次连接,成功

2.2 使用docker部署doris

最小实验环境(1Fe+1Be)

具体参考doris官网 Docker部署章节

运行doris容器前,做宿主机配置 由于 Doris 的运行需要内存映射支持,需在宿主机执行如下命令

运行doris容器

--name=fe \
--env FE_SERVERS="fe1:192.168.55.133:9010" \
--env FE_ID=1 \
-p 8030:8030 \
-p 9030:9030 \
-v /data/fe/doris-meta:/opt/apache-doris/fe/doris-meta \
-v /data/fe/log:/opt/apache-doris/fe/log \
-v /data/fe/conf:/opt/apache-doris/be/conf \
--net=host \
apache/doris:2.0.0_alpha-fe-x86_64
# -------------------------------------------------- #
docker run -itd \
--name=be \
--env FE_SERVERS="fe1:192.168.55.133:9010" \
--env BE_ADDR="192.168.55.133:9050" \
-p 8040:8040 \
-v /data/be/storage:/opt/apache-doris/be/storage \
-v /data/be/log:/opt/apache-doris/be/log \
-v /data/fe/conf:/opt/apache-doris/be/conf \
--net=host \
apache/doris:2.0.0_alpha-be-x86_64

通过mysql客户端连接到doris的fe后,将后端be添加到集群

docker部署host网络模式,无需添加

ALTER SYSTEM ADD BACKEND "be_host_ip:heartbeat_service_port";
ALTER SYSTEM ADD BACKEND "192.168.55.133:9050";

查看fe、be情况:

show frontends\G;

SHOW BACKENDS\G;

3. 安装flinkCDC

参考flinkCDC官网文档基于 Flink CDC 3.0 构建 MySQL 到 Doris 的 Streaming ELT — CDC Connectors for Apache Flink® documentation (ververica.github.io)

通过 FlinkCDC cli 提交任务

  1. 下载下面列出的二进制压缩包,并解压得到目录 flink-cdc-3.0.0flink-cdc-3.0.0-bin.tar.gz flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录。

  2. 下载下面列出的 connector 包,并且移动到 lib 目录下 下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译

  3. 编写任务配置 yaml 文件 下面给出了一个整库同步的示例文件 mysql-to-doris.yaml:

################################################################################
 # Description: Sync MySQL all tables to Doris
 ################################################################################
 source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: app_db.\.*
  server-id: 5400-5404
  server-time-zone: UTC
 
 sink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1
 
 pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2
已经启动了flink任务之后(flink-web页面可以看到有 Available Task Slots ),配置好mysql-to-doris.yaml后启动flinkCDC,之后可以看到已经启动

bash bin/flink-cdc.sh mysql-to-doris.yaml

启动之后看到有一个正在运行的job两个task(启动的task数量取决于mysql-to-doris.yaml中配置的并行度,两者一致)

  1. mysql-to-doris.yaml讲解:

其中: source 中的 tables: app_db.\.* 通过正则匹配同步 app_db 下的所有表。 sink 添加 table.create.properties.replication_num 参数是由于 Docker 镜像中只有一个 Doris BE 节点。

  1. 最后,通过命令行提交任务到 Flink Standalone cluster

bash bin/flink-cdc.sh mysql-to-doris.yaml

提交成功后,返回信息如:

Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris

在 Flink Web UI,可以看到一个名为 Sync MySQL Database to Doris 的任务正在运行。

更多高级用法

MySQL CDC 连接器 — CDC Connectors for Apache Flink® documentation (ververica.github.io)

附件一:防火墙操作

不关闭防火墙,但是在防火墙上永久开放端口号

  1. 启动防火墙:

sudo systemctl start firewalld

  1. 永久开放8081flink-web端口和8030Doris-web端口和数据库连接端口9030:

sudo firewall-cmd --zone=public --add-port=8081/tcp --permanent

sudo firewall-cmd --zone=public --add-port=8030/tcp --permanent

sudo firewall-cmd --zone=public --add-port=9030/tcp --permanent

重新加载防火墙配置:

sudo firewall-cmd --reload
这些命令将启动防火墙并允许8081和8030端口的流量通过防火墙。

查看已经在防火墙中开放了哪些端口,可以使用以下命令:

sudo firewall-cmd --list-ports

这个命令将列出防火墙中已经开放的所有端口。如果你想查看特定服务的端口是否已经开放,可以使用以下命令:

sudo firewall-cmd --list-ports | grep <service_name>

将 `` 替换为你想要查找的服务的名称,例如sshhttp 等。

另外,如果你想查看所有已经开放的服务,可以使用以下命令:

sudo firewall-cmd --list-services

这个命令将列出防火墙中已经开放的所有服务。

疑问

以上实现了基础核心采集功能

  1. 断点续传,flink服务挂掉重启:如何保障?

    1. 应该需要检查点和保存点
  2. 如果mysql增加了新表,如何通过flinkCDC同步新增表到doris?

  3. 检查点的类型选择如何定?state.backend.type: hashmap、rocksdb、其他

检查点的操作说明:

  1. 确定作业ID:首先,你需要确定你想要创建 Savepoint 的 Flink 作业的作业ID。你可以在 Flink 的 Web UI 或者命令行中找到作业的作业ID。
  1. 使用命令行工具创建 Savepoint:使用 flink savepoint 命令来创建 Savepoint。命令的基本格式如下:

./bin/flink savepoint <jobID> [targetDirectory]

其中,`` 是你要创建 Savepoint 的作业的作业ID,[targetDirectory] 是可选的目标目录,用于指定 Savepoint 的保存路径。如果不指定目标目录,Flink 将会自动选择一个默认的保存路径。

例如,如果你的作业ID是 e2b5e0c7f3b7d94d9c5c1a7e60d7e0a1,你可以使用以下命令来创建 Savepoint:

./bin/flink savepoint e2b5e0c7f3b7d94d9c5c1a7e60d7e0a1 /path/to/savepoint/directory

  1. 等待 Savepoint 创建完成:执行创建 Savepoint 的命令后,Flink 会开始创建 Savepoint。你需要等待一段时间,直到 Savepoint 创建完成。你可以在 Flink 的 Web UI 中或者命令行中查看 Savepoint 的创建进度和状态。
  1. 确认 Savepoint 创建完成:一旦 Savepoint 创建完成,你可以在指定的目标目录中找到 Savepoint 文件,确认 Savepoint 创建成功。

通过上述步骤,你就可以手动创建一个 Flink Savepoint。创建成功后,你可以使用这个 Savepoint 来恢复应用程序的状态,或者在升级、迁移应用程序时使用。

希望这个步骤能够帮助你手动创建 Flink 的 Savepoint。如果你有其他问题,欢迎随时提问。

指定saveponit启动作业:

在 Flink 中,你可以在启动作业时指定要使用的 Savepoint。这样,Flink 会从指定的 Savepoint 中恢复应用程序的状态,并从上次保存的状态开始处理数据。下面是如何在启动作业时指定 Savepoint 的步骤:

  1. 确定要使用的 Savepoint:首先,你需要确定你要使用的 Savepoint 文件的路径。你可以选择之前手动创建的 Savepoint,或者使用 Flink 自动保存的 Savepoint。
  1. 使用命令行工具启动作业:使用 flink run 命令来启动作业,并在命令中指定要使用的 Savepoint。命令的基本格式如下:

./bin/flink run -s :savepointPath :jobFile

其中,:savepointPath 是你要使用的 Savepoint 文件的路径,:jobFile 是你要运行的作业文件。

例如,如果你想要从 /path/to/savepoint/directory/savepoint-1234 这个 Savepoint 启动作业,你可以使用以下命令:

./bin/flink run -s file:///path/to/savepoint/directory/savepoint-1234 /path/to/your/job.jar

  1. 等待作业启动:执行启动作业的命令后,Flink 会从指定的 Savepoint 中启动作业。你需要等待一段时间,直到作业成功启动。

通过上述步骤,你就可以在启动作业时指定使用的 Savepoint。这样,Flink 将会从指定的 Savepoint 中恢复应用程序的状态,并继续处理数据。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,478评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,825评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,482评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,726评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,633评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,018评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,513评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,168评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,320评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,264评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,288评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,995评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,587评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,667评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,909评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,284评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,862评论 2 339

推荐阅读更多精彩内容