Flink JobManager HA高可用
概述
本文主要讲解下Flink standalone下JobManager的HA高可用和Flink on yarn下JobManager的HA高可用。
JobManager 高可用(HA)
jobManager协调每个flink任务部署。它负责任务调度和资源管理。
默认情况下,每个flink集群只有一个JobManager,这将导致一个单点故障(SPOF):如果JobManager挂了,则不能提交新的任务,并且运行中的程序也会失败。
使用JobManager HA,集群可以从JobManager故障中恢复,从而避免SPOF(单点故障) 。 用户可以在standalone或 YARN集群 模式下,配置集群高可用
JobManager HA配置步骤
-
Standalone集群的高可用
Standalone模式(独立模式)下JobManager的高可用性的基本思想是,任何时候都有一个 Master JobManager ,并且多个Standby JobManagers 。 Standby JobManagers可以在Master JobManager 挂掉的情况下接管集群成为Master JobManager。 这样保证了没有单点故障,一旦某一个Standby JobManager接管集群,程序就可以继续运行。 Standby JobManager和Master JobManager实例之间没有明确区别。 每个JobManager都可以成为Master或Standby节点
-
Yarn 集群高可用
flink on yarn的HA 其实主要是利用yarn自己的job恢复机制
Flink Standalone集群HA配置
1.HA集群环境规划
使用两台节点实现两主两从集群
注意:
要启用JobManager高可用性,必须将高可用性模式设置为zookeeper,配置一个ZooKeeper
quorum,并配置一个masters文件存储所有JobManager hostname及其Web UI端口号。
Flink利用ZooKeeper实现运行中的JobManager节点之间的分布式协调。ZooKeeper是独立
于Flink的服务,它通过领导选举制和轻量级状态一致性存储来提供高度可靠的分布式协调。
2. 开始配置+启动
集群内所有节点的配置都一样,所以先从第一台机器50.63,50.64开始配置
ssh data-hadoop-50-63.xxx.com
Stanalone的配置
#首先按照之前配置 standalone 的参数进行修改
vi conf/flink-conf.yaml
jobmanager.rpc.address: data-hadoop-50-63.xxx.com
vi conf/slaves
data-hadoop-50-63.xxx.com
data-hadoop-50-64.xxx.com
HA的配置
[iknow@data-hadoop-50-63 flink-1.7.2]$ cat conf/slaves
#localhost
data-hadoop-50-63.xxx.com
data-hadoop-50-64.xxx.com
#jobmanager.rpc.address: localhost
jobmanager.rpc.address: data-hadoop-50-63.xxx.com
#jobmanager.rpc.address: 192.168.50.63
# high-availability: zookeeper
high-availability: zookeeper
#ZooKeeper 节点根目录,其下放置所有集群节点的 namespace
high-availability.zookeeper.path.root: /flink-standalone-ha
# high-availability.storageDir: hdfs:///flink/ha/
#ZooKeeper节点集群id,其中放置了集群所需的所有协调数据
high-availability.cluster-id: /cluster_one
#建议指定 hdfs 的全路径。如果某个 flink 节点没有配置 hdfs 的话,不指定全路径无法识别
# storageDir 存储了恢复一个 JobManager 所需的所有元数据。
high-availability.storageDir: hdfs://data-hadoop-50-63.xxx.com:9000/flink/flink-standalone-ha
# high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.quorum: data-hadoop-50-63.xxx.com:2181
#rest.port: 8081
rest.port: 18081
启动服务
先启动 hadoop 服务
sbin/start-all.sh
先启动 zk 服务
bin/zkServer.sh start
启动flink standaloneHA集群,在50.63节点上启动如下命令
bin/start-cluster.sh
去zk里查看
./bin/zkCli.sh
启动HA集群
[iknow@data-hadoop-50-63 flink-1.7.2]$ ./bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host data-hadoop-50-63.bjrs.zybang.com.
Starting standalonesession daemon on host data-hadoop-50-64.bjrs.zybang.com.
Starting taskexecutor daemon on host data-hadoop-50-63.bjrs.zybang.com.
Starting taskexecutor daemon on host data-hadoop-50-64.bjrs.zybang.com.
3. 验证 HA 集群进程
50.63启动的进程
50.64上启动的进程
Flink web ui未启动在18081端口上,
通过查看50.63日志发现flink web ui启动在45141端口,
2019-02-25 10:48:48,787 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Rest endpoint listening at data-hadoop-50-63.bjrs.com:45141
2019-02-25 10:48:48,813 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Web frontend listening at http://data-hadoop-50-63.bjrs.zybang.com:45141.
2019-02-25 10:48:48,851 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - http://data-hadoop-50-63.bjrs.zybang.com:45141 was granted leadership with leaderSessionID=3a3047aa-bdb8-4234-9e1a-d3900081f169
2019-02-25 10:48:48,878 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2019-02-25 10:48:48,911 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher
jobManager启动在50.63上
50.64日志:
2019-02-25 10:48:46,421 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Rest endpoint listening at data-hadoop-50-64.bjrs.zybang.com:41511
2019-02-25 10:48:46,421 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService - Starting ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.
2019-02-25 10:48:46,444 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Web frontend listening at http://data-hadoop-50-64.bjrs.zybang.com:41511.
2019-02-25 10:48:46,502 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2019-02-25 10:48:46,527 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
4. 模拟jobmanager进程挂掉
现在50.63节点上的jobmanager是active的。我们手工把这个进程kill掉,模拟进程
挂掉的情况,来验证50.64上的standby状态的jobmanager是否可以正常切换到active。
[iknow@data-hadoop-50-63 flink-1.7.2]$ jps
428021 DataNode
431794 TaskManagerRunner
428320 SecondaryNameNode
429073 NodeManager
76966 Worker
437470 Jps
427770 NameNode
431177 StandaloneSessionClusterEntrypoint
429833 QuorumPeerMain
[iknow@data-hadoop-50-63 flink-1.7.2]$ kill -9 431177
[iknow@data-hadoop-50-63 flink-1.7.2]$ jps
428021 DataNode
431794 TaskManagerRunner
428320 SecondaryNameNode
429073 NodeManager
76966 Worker
427770 NameNode
437867 Jps
429833 QuorumPeerMain
5.验证HA切换
Kill掉50.63上的jobmanager再次访问50.63上的JobManager是访问不了的;
50.63节点上的jobmanager进程被手工kill掉了,然后50.64上的jobmanager会
自动切换为active,中间需要有一个时间差,稍微等一下。
Kill掉50.63上的jobManager之后,访问50.64
6.重启之前kill掉的jobmanager
在50.64上重启之前被kill的jobmanager
[iknow@data-hadoop-50-63 flink-1.7.2]$ jps
428021 DataNode
431794 TaskManagerRunner
428320 SecondaryNameNode
76966 Worker
427770 NameNode
445947 Jps
429833 QuorumPeerMain
[iknow@data-hadoop-50-63 flink-1.7.2]$ ./bin/jobmanager.sh start
Starting standalonesession daemon on host data-hadoop-50-63.bjrs.zybang.com.
[iknow@data-hadoop-50-63 flink-1.7.2]$ jps
446533 StandaloneSessionClusterEntrypoint
428021 DataNode
431794 TaskManagerRunner
428320 SecondaryNameNode
76966 Worker
427770 NameNode
446667 Jps
429833 QuorumPeerMain
这个时候active的jobManager还是50.64
查看hdfs,看到是有/flink/flink-standalone-ha/cluster_one 目录生成的(blob是什么目录,后面在研究下,从flink jobManager的日志看是有生成blob目录的)
2019-02-25 10:48:47,747 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor system started at akka.tcp://flink@data-hadoop-50-63.bjrs.zybang.com:36169
2019-02-25 10:48:48,042 INFO org.apache.flink.runtime.blob.FileSystemBlobStore - Creating highly available BLOB storage directory at hdfs://data-hadoop-50-63.bjrs.zybang.com:9000/flink/flink-standalone-ha//cluster_one/blob
Flink on yarn集群HA
1. HA集群环境规划
flink on yarn的HA其实是利用yarn自己的恢复机制。
在这需要用到zk,主要是因为虽然flink-on-yarn cluster HA依赖于Yarn自己的集群机制,但是Flink Job在恢复时,需要依赖检查点产生的快照,而这些快照虽然配置在hdfs,但是其元数据信息保存在zookeeper 中,所以我们还要配置 zookeeper 的信息。
首先需要修改hadoop中 yarn-site.xml 中的配置,设置提交应用程序的最大尝试次数
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>8</value>
<description>
The maximum number of application master execution attempts.
</description>
配置Yarn重试次数
vi conf/flink-conf.yaml
yarn.application-attempts: 8
此参数代表Flink Job(yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。
注意,Flink On Yarn环境中,当Jobmanager(Application Master)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps.
3. 启动flink on yarn,测试HA
在50.63上启动zk和hadoop
bin/zkServer.sh start
sbin/start-all.sh
2019-02-25 11:48:20,914 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=data-hadoop-50-63.bjrs.zybang.com:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@182f1e9a
2019-02-25 11:48:20,926 WARN org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-7156394372293067519.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
2019-02-25 11:48:20,929 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening socket connection to server data-hadoop-50-63.bjrs.zybang.com/192.168.50.63:2181
2019-02-25 11:48:20,930 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket connection established to data-hadoop-50-63.bjrs.zybang.com/192.168.50.63:2181, initiating session
2019-02-25 11:48:20,930 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed
2019-02-25 11:48:20,938 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session establishment complete on server data-hadoop-50-63.bjrs.zybang.com/192.168.50.63:2181, sessionid = 0x169228cb2e10006, negotiated timeout = 40000
2019-02-25 11:48:20,940 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
2019-02-25 11:48:21,093 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
Flink JobManager is now running on data-hadoop-50-63.bjrs.zybang.com:38695 with leader id 45d62639-aac4-4903-ae50-a4d69f987e4a.
JobManager Web Interface: http://data-hadoop-50-63.bjrs.zybang.com:34621
2019-02-25 11:48:21,155 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:
yarn application -kill application_1551066475117_0001
Hdfs上有HA对应的目录生成
HADOOP_CONF_DIR)下看看日志情况,当前AM的日志路径在$HADOOP_CONF_DIR/userlogs//下,可以看出Yarn在重启YarnApplicationMasterRunner进程,并在重启期后重新提交Flink的Job。
Hadoop下是有logs/userlogs/目录的
jobmanager 进程就在对应的节点的(YarnSessionClusterEntrypoint)进程里面
所以想要测试 jobmanager 的 HA 情况,只需要拿 YarnSessionClusterEntrypoint 这个进程进行测试即可。
执行下面命令手工模拟 kill 掉 jobmanager(YarnSessionClusterEntrypoint)
Kill 掉jobManager进程
[iknow@data-hadoop-50-63 flink-1.7.2]$ jps
428021 DataNode
428320 SecondaryNameNode
76966 Worker
458095 NodeManager
729 YarnSessionClusterEntrypoint
427770 NameNode
457803 ResourceManager
1581 Jps
457275 FlinkYarnSessionCli
429833 QuorumPeerMain
[iknow@data-hadoop-50-63 flink-1.7.2]$ kill -9 729
[iknow@data-hadoop-50-63 flink-1.7.2]$ jps
428021 DataNode
3540 Jps
428320 SecondaryNameNode
76966 Worker
458095 NodeManager
427770 NameNode
457803 ResourceManager
457275 FlinkYarnSessionCli
3438 YarnSessionClusterEntrypoint
429833 QuorumPeerMain
如图看到Attempt ID由000001变为000002、000003…,示进程也有所变化,,说明HA切换成功了。
进入zk查看zk里的目录,flink-yarn-ha是有生成的。
./bin/zkCli.sh 命令进去zk,quit退出zk。
HA配置,这里只列出High Availability的配置,其他的配置未列出
#==============================================================================
# High Availability
#==============================================================================
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
#
# high-availability: zookeeper
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink-yarn-ha
# The path where metadata for master recovery is persisted. While ZooKeeper stores
# the small ground truth for checkpoint and leader election, this location stores
# the larger objects, like persisted dataflow graphs.
#
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...)
#
# high-availability.storageDir: hdfs:///flink/ha/
#high-availability.cluster-id: /cluster_one
high-availability.storageDir: hdfs://data-hadoop-50-63.xxx.com:9000/flink/flink-yarn-ha
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default client
#
# high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.quorum: data-hadoop-50-63.xxx.com:2181
yarn.application-attempts: 8
high-availability.jobmanager.port: 18085
# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
# The default value is "open" and it can be changed to "creator" if ZK security is enabled
#
# high-availability.zookeeper.client.acl: open