生产级别flink HA集群搭建和调优

本文讲解生产级别flink HA cluster的搭建和参数调优详细过程。

1. 前置工作

1.1 准备五台机器

序号 ip hostname 配置 进程
1 192.168.0.1 flink-job-master1 2C 2G job manager, history server
2 192.168.0.2 flink-job-master2 2C 2G job manager, history server
3 192.168.0.3 flink-tm1 16C 32G taskmanager
4 192.168.0.4 flink-tm2 16C 32G taskmanager
5 192.168.0.5 flink-tm3 16C 32G taskmanager


jobserver配置可以低一点

1.2 需要再次确认的东西 1.2.1 hostname 是否配置? 1.2.2 /etc/hosts是否配置了? 1.2.3 是否禁掉防火墙 1.2.4 是否关闭selinux 1.2.5 SSH免密是否完成 1.2.8 端口是否添加安全组 1.2.9 JDK环境变量是否安装配置生效 1.2.11 scala安装和环境变量配置

# 2.12版本地址 2.11 https://downloads.lightbend.com/scala/2.12.8/scala-2.12.8.rpm
https://downloads.lightbend.com/scala/2.11.12/scala-2.11.12.rpm
rpm -ivh scala-2.11.12.rpm

export SCALA_HOME=/usr/share/scala
export PATH=$SCALA_HOME/bin:$PATH

1.2.12 一个可用的zk集群 搭建zk方法见HADOOP HA + HBASE HA + YARN HA 搭 一文中提到的zk的安装方法。 本文暂定已经有了一个 zk1:2181, zk2:2181, zk3:2181的zk集群了。

如果上面那个不满足请看 hadoop2.8.5安装采坑指南 的步骤,对号入座进行配置。 这里就不在赘述。

本文讲解如何搭建flink standonly cluster HA模式, 如果想看 单机版本搭建flink,flink和 yarn集成的内容请看文章:

2.1 安装包下载

# https://flink.apache.org/downloads.html#apache-flink-180

cd /opt/wzapp/install
wget http://mirror.bit.edu.cn/apache/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.11.tgz

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.8.0/flink-json-1.8.0.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop2-uber/2.8.3-1.8.0/flink-shaded-hadoop2-uber-2.8.3-1.8.0.jar

tar -zxvf flink-1.8.0-bin-scala_2.11.tgz
mv flink-json-1.8.0.jar flink-1.8.0/lib/
mv flink-shaded-hadoop2-uber-2.8.3-1.8.0.jar flink-1.8.0/lib/

flink官方配置讲解

环境变量 ~/.bshrc

export SCALA_HOME=/usr/share/scala
export PATH=$SCALA_HOME/bin:$PATH

# hadoop 运行环境
export HADOOP_HOME=/opt/data/install/hadoop-2.8.5
export PATH=$PATH:$HADOOP_HOME/sbin
export PATH=$PATH:$HADOOP_HOME/bin

# hadoop lib环境
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"

export SPARK_HOME=/opt/data/install/spark-2.4.0-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
#==============================================================================
# Common
#==============================================================================
jobmanager.rpc.address: spark-master 

jobmanager.rpc.port: 8123

jobmanager.heap.size: 2024m

taskmanager.heap.size: 6024m
#根据实际情况 16C32GB的话 可以分配16
taskmanager.numberOfTaskSlots: 4

parallelism.default: 1

# 注意如果使用的hadoop HA的话, 这里要配置hadoop HA中core-site.xml中配置的HA nameservice
fs.default-scheme: hdfs://hadoop-ha-nameservice/ 
# 如果使用Hadoop HA的时候,一定要配置hadoop conf目录,把hadoop 集群的配置文件拷贝到这个目录里面,这样 flink就会知道hadoop-ha-nameservice的具体active的nameNode的地址。
env.hadoop.conf.dir: /opt/app/flink-1.8.0/hadoop-conf

#==============================================================================
# High Availability
#==============================================================================

high-availability: zookeeper

high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181

high-availability.zookeeper.path.root: /flink-ha/
# jobManager的metadata数据都放置在hdfs上,zk中只放metadata的url(跟hbase meta,跟hbase的meta表思路类似)
high-availability.storageDir: hdfs:///flink-ha/recovery/

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
state.checkpoints.dir: hdfs://hadoop-ha-nameservice/flink/flink-checkpoints
state.backend.fs.checkpointdir: hdfs://hadoop-ha-nameservice/flink/flink-checkpoints


state.backend: rocksdb 

state.checkpoints.dir: hdfs://hadoop-ha-nameservice/flink/flink-checkpoints

state.savepoints.dir: hdfs://hadoop-ha-nameservice/flink/flink-
savepoints
# 增量更新的时候,不适合于某些backend, 会出现诡异问题
#state.backend.incremental: true 

#==============================================================================
# Rest & web frontend
#==============================================================================
web.submit.enable: true

#==============================================================================
# Advanced
#==============================================================================

io.tmp.dirs: /data/flink/tmp

taskmanager.memory.preallocate: false 

#==============================================================================
# HistoryServer
#==============================================================================

jobmanager.archive.fs.dir: hdfs://hadoop-ha-nameservice/flink/completed-jobs/

historyserver.web.port: 8082

historyserver.archive.fs.dir: hdfs:///flink/completed-jobs/

historyserver.archive.fs.refresh-interval: 10000

#配置单独的日志目录
env.log.dir: /data/flink/logs
env.log.max: 50

# 配置JMX端口, GC参数, GC日志,程序退出自动堆栈
env.java.opts: -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -verbose:gc -XX:+PrintGCDetails -Xloggc:/data/flink/logs/flink-gc.log -XX:+PrintGCTimeStamps -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCApplicationStoppedTime -server  -XX:SurvivorRatio=4 -XX:MaxTenuringThreshold=15 -XX:ParallelGCThreads=2 -XX:+UseConcMarkSweepGC -XX:+UseParNewGC  -XX:CMSFullGCsBeforeCompaction=5 -XX:+UseCMSCompactAtFullCollection -XX:+CMSClassUnloadingEnabled  -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSMaxAbortablePrecleanTime=5000  -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=20m -XX:ErrorFile=/data/flink/logs/hs_err.log.%p -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data/flink/logs/%p.hprof

env.java.opts.historyserver: -Dcom.sun.management.jmxremote.port=10021
env.java.opts.jobmanager: -Dcom.sun.management.jmxremote.port=10022
env.java.opts.taskmanager: -Dcom.sun.management.jmxremote.port=10023
tate.backend.rocksdb.localdir: /data/flink/rocksdb/

taskmanager.tmp.dirs: /data/flink/taskmanager-tmp
state.backend.rocksdb.localdir: /data/flink/rocksdb/

注意: 这两个配置在HA模式下无效

jobmanager.rpc.address
jobmanager.rpc.port

conf/masters


flink-job-master1:8081
flink-job-master2:8081

conf/slaves


flink-tm1
flink-tm2
flink-tm3
bin/start-cluster.sh
bin/historyserver.sh start

浏览器访问 http://flink-job-master1:8081http://flink-job-master1:8082

几个细节:

  • flink可以不cancel job然后直接重启集群。 启动集群以后, 之前的job会自动起来
  • cancel正在运行中的job(可以使网页上直接cancel,或者命令cancel)然后用 bin/flink run -s从断点启动
  • 对于flink设置的timeWindow=1分钟的程序,一分钟统计一次, 比如正分钟统计次, 那么观察kafka的 LAG,发现 每到 整分的30秒钟,会统一消费一次。
# cluster关闭
bin/historyserver.sh stop
bin/stop-cluster.sh

# 提交job
flink-1.8.0/bin/flink run -p 4 -c  com.moheqinglin.xxx /data/flink/flink-job/xxx-1.0.0.jar   par1 par2 

# list job
bin/flink list

# cancel job
bin/flink cancel  xxxxx

# 取消job同时保存state
bin/flink cancel  -s xxxxx

# 启动job 从上次state启动
bin/flink run -s  hdfs://wz-hadoop/flink/flink-checkpoints/xxx -p 4 -c  com.moheqinglin.xxx /data/flink/flink-job/xxx-1.0.0.jar   parm1 parm1 

flink-1.8.0/bin/flink run  -s hdfs://wz-hadoop/flink/flink-savepoints/savepoint-c47550-e63b0fa80f73  -p 3 -c  com.moheqionglin.flink.Counter.Statistics  flink-job/flink-1.0.0.jar  61 param1 param2

flink-1.8.0/bin/flink run   -p 3 -c  com.moheqionglin.flink.Counter.Statistics  flink-job/flink-1.0.0.jar  61 param1 param2

启动job的时候报

Permission denied: user=flink, access=WRITE, inode="/":hadoop:supergroup:drwxr-xr-x


问题解读: 该问题是因为 我们配置了hdfs的flink的一些checkpoint目录但是 第一次启动的时候因为没有 相应的目录, flink尝试创建hdfs目录,但是发现权限不足导致的。
解决方法: 两中解决方法:
方法1: 修改 core-site.xml然后取消hdfs的权限检查,然后重启 hadoop集群

<property>
<name>dfs.permissions</name>
<value>false</value>
<description>
If "true", enable permission checking in HDFS.
If "false", permission checking is turned off,
but all other behavior is unchanged.
Switching from one parameter value to the other does not change the mode,
owner or group of files or directories.
</description>
</property>

方法2: 如果还想保留权限检查的话,我们要手动创建 flink需要的目录。 本文中也就是进行如下操作。

hadoop fs -mkdir /flink
#修改权限
hadoop fs -chown -R flink:flink /flink
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
慷慨打赏