flink中如何实现基于k8s的环境搭建

16次阅读
没有评论

这篇文章主要介绍了 flink 中如何实现基于 k8s 的环境搭建,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让丸趣 TV 小编带着大家一起了解一下。

前面写了一些 flink 的基础组件,但是还没有说过 flink 的环境搭建,现在我们来说下基本的环境搭建
1. 使用 StatefulSet 的原因
对于 Flink 来说,使用 sts 的最大的原因是 pod 的 hostname 是有序的;这样潜在的好处有
hostname 为 - 0 和 - 1 的 pod 可以直接指定为 jobmanager;可以使用一个 statefulset 启动一个 cluster,而 deployment 必须 2 个;Jobmanager 和 TaskManager 分别独立的 deployment
pod 由于各种原因 fail 后,由于 StatefulSet 重新拉起的 pod 的 hostname 不变,集群 recover 的速度理论上可以比 deployment 更快(deployment 每次主机名随机)
2. 使用 StatefulSet 部署 Flink
2.1 docker 的 entrypoint
由于要由主机名来判断是启动 jobmanager 还是 taskmanager,因此需要在 entrypoint 中去匹配设置的 jobmanager 的主机名是否有一致
传入参数为:cluster ha;则自动根据主机名判断启动那个角色;也可以直接指定角色名称
docker-entrypoint.sh 的脚本内容如下:

#!/bin/sh
 
# If unspecified, the hostname of the container is taken as the JobManager address
ACTION_CMD= $1 
# if use cluster model, pod ${JOB_CLUSTER_NAME}-0,${JOB_CLUSTER_NAME}-1 as jobmanager
if [ ${ACTION_CMD} ==  cluster  ]; then
 jobmanagers=(${JOB_MANGER_HOSTS//,/ })
 ACTION_CMD= taskmanager 
 for i in ${!jobmanagers[@]}
 do
 if [  $(hostname -s)  ==  ${jobmanagers[i]}  ]; then
 ACTION_CMD= jobmanager 
 echo  pod hostname match jobmanager config host, change action to jobmanager. 
 fi
 done
 
# if ha model, replace ha configuration
if [  $2  ==  ha  ]; then
 sed -i -e  s|high-availability.cluster-id: cluster-id|high-availability.cluster-id: ${FLINK_CLUSTER_IDENT}|g   $FLINK_CONF_DIR/flink-conf.yaml 
 sed -i -e  s|high-availability.zookeeper.quorum: localhost:2181|high-availability.zookeeper.quorum: ${FLINK_ZK_QUORUM}|g   $FLINK_CONF_DIR/flink-conf.yaml 
 sed -i -e  s|state.backend.fs.checkpointdir: checkpointdir|state.backend.fs.checkpointdir: hdfs:///user/flink/flink-checkpoints/${FLINK_CLUSTER_IDENT}|g   $FLINK_CONF_DIR/flink-conf.yaml 
 sed -i -e  s|high-availability.storageDir: hdfs:///flink/ha/|high-availability.storageDir: hdfs:///user/flink/ha/${FLINK_CLUSTER_IDENT}|g   $FLINK_CONF_DIR/flink-conf.yaml 
 
if [ ${ACTION_CMD} ==  help  ]; then
 echo  Usage: $(basename  $0) (cluster ha|jobmanager|taskmanager|local|help) 
 exit 0
elif [ ${ACTION_CMD} ==  jobmanager  ]; then
 JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
 echo  Starting Job Manager 
 sed -i -e  s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g   $FLINK_CONF_DIR/flink-conf.yaml 
 sed -i -e  s/jobmanager.heap.mb: 1024/jobmanager.heap.mb: ${JOB_MANAGER_HEAP_MB}/g   $FLINK_CONF_DIR/flink-conf.yaml 
 
 echo  config file:     grep  ^[^\n#]   $FLINK_CONF_DIR/flink-conf.yaml 
 exec  $FLINK_HOME/bin/jobmanager.sh  start-foreground cluster
 
elif [ ${ACTION_CMD} ==  taskmanager  ]; then
 TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
 echo  Starting Task Manager 
 
 sed -i -e  s/taskmanager.heap.mb: 1024/taskmanager.heap.mb: ${TASK_MANAGER_HEAP_MB}/g   $FLINK_CONF_DIR/flink-conf.yaml 
 sed -i -e  s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g   $FLINK_CONF_DIR/flink-conf.yaml 
 
 echo  config file:     grep  ^[^\n#]   $FLINK_CONF_DIR/flink-conf.yaml 
 exec  $FLINK_HOME/bin/taskmanager.sh  start-foreground
elif [ ${ACTION_CMD} ==  local  ]; then
 echo  Starting local cluster 
 exec  $FLINK_HOME/bin/jobmanager.sh  start-foreground local
 
exec  $@

2.2. 使用 ConfigMap 分发 hdfs 和 flink 配置文件
ConfigMap 介绍参考:
https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#create-configmaps-from-files
Q:为什么使用 ConfigMap
A:由于 hadoop 配置文件在不同的环境不一样,不方便打包到镜像里面;因此合适的方式就只有 2 种,使用 ConfigMap 和 Pod 的 InitContainer。使用 InitContainer 的话,可以 wget 获取远程的一个配置文件,但是这样还需要依赖一个配置服务。相比而已,ConfigMap 更简单。
创建 ConfigMap
[root@rc-mzgjg ~]# kubectl create configmap hdfs-conf –from-file=hdfs-site.xml –from-file=core-site.xml
[root@rc-mzgjg ~]# kubectl create configmap flink-conf –from-file=flink-conf/log4j-console.properties –from-file=flink-conf/flink-conf.yaml
使用 describe 命令查看创建的名词为 hdfs-conf 的 ConfigMap,会显示文件的内容到控制台
[root@rc-mzgjg ~]# kubectl describe configmap hdfs-conf
Name:         hdfs-conf
Namespace:    default
Labels:       none
Annotations:   none
Data
====
core-site.xml:
通过 volumeMounts 使用 ConfigMap
Pod 的 Container 要使用配置文件,则可以通过 volumeMounts 方式挂载到 Container 中。如下 demo 所示,将配置文件挂载到 /home/xxxx/conf/hadoop 目录下

apiVersion: apps/v1
kind: StatefulSet
metadata:
 name: flink-jm
spec:
 selector:
 matchLabels:
 app: flink-jm
 serviceName: flink-jm
 replicas: 2
 podManagementPolicy: Parallel
 template:
 metadata:
 labels:
 app: flink-jm
 spec:
 terminationGracePeriodSeconds: 2
 containers:
 - name: test
 imagePullPolicy: Always
 image: ip:5000/test:latest
 args: [sleep ,  1d]
 volumeMounts:
 - name: hdfs-conf
 mountPath: /home/xxxx/conf/hadoop
 volumes:
 - name: hdfs-conf
 configMap:
 # Provide the name of the ConfigMap containing the files you want to add to the container
 name: hdfs-conf

创建好 Pod 后,查看配置文件的挂载
[hadoop@flink-jm-0 hadoop]$ ll /home/xxxx/conf/hadoop
total 0
lrwxrwxrwx. 1 root root 20 Apr  9 06:54 core-site.xml – ..data/core-site.xml
lrwxrwxrwx. 1 root root 20 Apr  9 06:54 hdfs-site.xml – ..data/hdfs-site.xml
配置文件是链接到了..data 目录
1.10 才能支持 Pod 多 Container 的 namespace 共享
最初的想法是一个 Pod 里面多个 Container,然后配置文件是其中一个 Container;测试验证起数据目录并不能互相访问;如预想的配置,其中一个 Container 里面的 image 是 hdfs-conf 的配置文件

containers:
 - name: hdfs-conf
 imagePullPolicy: Always
 image: ip:5000/hdfs-dev:2.6
 args: [sleep ,  1d]
 - name: flink-jm
 imagePullPolicy: Always
 image: ip:5000/flink:1.4.2

实际验证,两个 Container 的只能共享网络,文件目录彼此看不见
“Share Process Namespace between Containers in a Pod”这个是 Kubernates 1.10 才开始支持,参考
https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/
2.3 StatefulSet 的配置
Flink 的配置文件和 hadoop 的配置文件,依赖 ConfigMap 来分发

环境变量名称

参数

内容

  说明

 

FLINK_CLUSTER_IDENT

namespace/StatefulSet.name

default/flink-cluster

用来做 zk ha 设置和 hdfs checkpiont 的根目录  

FLINK_ZK_QUORUM

env:FLINK_ZK_QUORUM

ip:2181

HA ZK 的地址  

JOB_MANAGER_HEAP_MB

env:JOB_MANAGER_HEAP_MB

value:containers.resources.memory.limit -1024

512JM 的 Heap 大小,由于存在 Netty 的堆外内存,需要小于 container.resources.memory.limits;否则容易 OOM kill 

JOB_MANGER_HOSTS
StatefulSet.name-0,StatefulSet.name-1
flink-cluster-0,flink-cluster-1

JM 的主机名,短主机名;可以不用 FQDN 

TASK_MANAGER_HEAP_MB

env:TASK_MANAGER_HEAP_MB 

value: containers.resources.memory.limit -1024

512TM 的 Heap 大小,由于存在 Netty 的堆外内存,需要小于 container.resources.memory.limits;否则容易 OOM kill 

TASK_MANAGER_NUMBER_OF_TASK_SLOTS

containers.resources.cpu.limits

2TM 的 slot 数量,根据 resources.cpu.limits 来设置  

Pod 的 imagePullPolicy 策略,测试环境 Always,每次都 pull,方便验证;线上则是 IfNotPresent;线上如果对 images 做了变更,必须更改 images 的 tag
完整的内容可以参考如下:

# headless service for statefulset
apiVersion: v1
kind: Service
metadata:
 name: flink-cluster
 labels:
 app: flink-cluster
spec:
 clusterIP: None
 ports:
 - port: 8080
 name: ui
 selector:
 app: flink-cluster
# create flink statefulset
apiVersion: apps/v1
kind: StatefulSet
metadata:
 name: flink-cluster
spec:
 selector:
 matchLabels:
 app: flink-cluster
 serviceName: flink-cluster
 replicas: 4
 podManagementPolicy: Parallel
 template:
 metadata:
 labels:
 app: flink-cluster
 spec:
 terminationGracePeriodSeconds: 2
 containers:
 - name: flink-cluster
 imagePullPolicy: Always
 image: ip:5000/flink:1.4.2
 args: [cluster ,  ha]
 volumeMounts:
 - name: hdfs-conf
 mountPath: /home/xxxx/conf/hadoop
 - name: flink-conf
 mountPath: /home/xxxx/conf/flink
 - name: flink-log
 mountPath: /home/xxxx/logs
 resources:
 requests:
 memory:  1536Mi 
 cpu: 1
 limits:
 memory:  1536Mi 
 cpu: 2
 env:
 - name: JOB_MANGER_HOSTS
 value:  flink-cluster-0,flink-cluster-1 
 - name: FLINK_CLUSTER_IDENT
 value:  default/flink-cluster 
 - name: TASK_MANAGER_NUMBER_OF_TASK_SLOTS
 value:  2 
 - name: FLINK_ZK_QUORUM
 value:  ip:2181 
 - name: JOB_MANAGER_HEAP_MB
 value:  512 
 - name: TASK_MANAGER_HEAP_MB
 value:  512 
 ports:
 - containerPort: 6124
 name: blob
 - containerPort: 6125
 name: query
 - containerPort: 8080
 name: flink-ui
 volumes:
 - name: hdfs-conf
 configMap:
 # Provide the name of the ConfigMap containing the files you want to add to the container
 name: hdfs-conf
 - name: flink-conf
 configMap:
 name: flink-conf
 - name: flink-log
 hostPath:
 # directory location on host
 path: /tmp
 # this field is optional
 type: Directory

 
3. 测试环境对外暴露 Flink UI
由于测试环境使用 Flannel 进行网络通信,在 K8S 集群外部无法访问到 Flink UI 的 IP 和端口,因此需要通过 NodePort 方式将内部 IP 映射出来。配置如下:

# only for test k8s cluster
# use service to expose flink jobmanager 0 s web port
apiVersion: v1
kind: Service
metadata:
 labels:
 app: flink-cluster
 statefulset.kubernetes.io/pod-name: flink-cluster-0
 name: flink-web-0
 namespace: default
spec:
 ports:
 - port: 8080
 protocol: TCP
 targetPort: 8080
 selector:
 app: flink-cluster
 statefulset.kubernetes.io/pod-name: flink-cluster-0
 type: NodePort
# use service to expose flink jobmanager 1 s web port
apiVersion: v1
kind: Service
metadata:
 labels:
 app: flink-cluster
 statefulset.kubernetes.io/pod-name: flink-cluster-1
 name: flink-web-1
 namespace: default
spec:
 ports:
 - port: 8080
 protocol: TCP
 targetPort: 8080
 selector:
 app: flink-cluster
 statefulset.kubernetes.io/pod-name: flink-cluster-1
 type: NodePort

 
4. 服务部署状态
执行完前面操作后,可以查看到当前的 StatefulSet 状态
[root@rc-mzgjg ~]# kubectl get sts flink-cluster -o wide
NAME            DESIRED   CURRENT   AGE       CONTAINERS      IMAGES
flink-cluster   4         4         1h        flink-cluster   ip:5000/flink:1.4.2
容器的 Pod 状态
[root@rc-mzgjg ~]# kubectl get pod -l app=flink-cluster  -o wide
NAME              READY     STATUS    RESTARTS   AGE       IP            NODE
flink-cluster-0   1/1       Running   0          1h        ip1   ip5
flink-cluster-1   1/1       Running   0          1h        ip2   ip6
flink-cluster-2   1/1       Running   0          1h        ip3   ip7
flink-cluster-3   1/1       Running   0          1h        ip4   ip8
相关的 Service 信息
[root@rc-mzgjg ~]# kubectl get svc -l app=flink-cluster  -o wide
NAME            TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE       SELECTOR
flink-cluster   ClusterIP   None             none        8080/TCP         2h        app=flink-cluster
flink-web-0     NodePort    10.254.8.103     none        8080:30495/TCP   1h        app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-0
flink-web-1     NodePort    10.254.172.158   none        8080:30158/TCP   1h        app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-1
根据 Service 的信息;可以通过任何一个 k8s node 的 ip 地址加 PORT 来访问 Flink UI

这里主要说一下,在搭建的过程中遇到了一个和权限相关的问题
错误日志如下
ERROR setFile(null,true) call failed
FileNotFoundException:no such file or directory
原因:是因为 flink 服务缺少日志目录的权限
修改方式:
1.adduser flink 添加相应的用户
2.chown -R flink:flink /home/xxxx/logs

感谢你能够认真阅读完这篇文章,希望丸趣 TV 小编分享的“flink 中如何实现基于 k8s 的环境搭建”这篇文章对大家有帮助,同时也希望大家多多支持丸趣 TV,关注丸趣 TV 行业资讯频道,更多相关知识等着你来学习!