Flink CDC系列之:理解学习Kubernetes模式

Kubernetes 是一种流行的容器编排系统,用于自动化计算机应用程序的部署、扩展和管理。Flink 的原生 Kubernetes 集成允许您直接在正在运行的 Kubernetes 集群上部署 Flink。此外,由于 Flink 可以直接与 Kubernetes 通信,因此它能够根据所需资源动态分配和取消分配 TaskManager。

Apache Flink 还提供了一个 Kubernetes 运算符,用于管理 Kubernetes 上的 Flink 集群。它支持独立和原生部署模式,并大大简化了 Kubernetes 上 Flink 资源的部署、配置和生命周期管理。

准备

本文档假设正在运行的 Kubernetes 集群满足以下要求:

  • Kubernetes >= 1.9。
  • KubeConfig,有权列出、创建、删除 pod 和服务,可通过 ~/.kube/config 进行配置。可以通过运行 kubectl auth can-i <list|create|edit|delete> pods 来验证权限。
  • 启用 Kubernetes DNS。
  • 具有 RBAC 权限的默认服务帐户可以创建、删除 pod。

会话模式

Flink 可在所有类 UNIX 环境中运行,即 Linux、Mac OS X 和 Cygwin(适用于 Windows)。

可以参考概述来检查支持的版本并下载 Flink 的二进制版本,然后提取存档:

tar -xzf flink-*.tgz

应该设置 FLINK_HOME 环境变量,例如:

export FLINK_HOME=/path/flink-*

启动会话集群

要在 k8s 上启动会话集群,请运行 Flink 附带的 bash 脚本:

cd /path/flink-*
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

启动成功后返回信息如下:

org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink session cluster my-first-flink-cluster successfully, JobManager Web Interface: http://my-first-flink-cluster-rest.default:8081

然后,需要将这两个配置添加到您的 flink-conf.yaml 中:

rest.bind-port: {{REST_PORT}}
rest.address: {{NODE_IP}}

{{REST_PORT}} 和 {{NODE_IP}} 应该被你的 JobManager Web 界面的实际值替换。

设置 Flink CDC

从发布页面下载 Flink CDC 的 tar 文件,然后提取存档:

tar -xzf flink-cdc-*.tar.gz

解压后的 flink-cdc 包含四个目录:bin、lib、log 和 conf。

从发布页面下载连接器 jar,并将其移动到 lib 目录。

下载链接仅适用于稳定版本,SNAPSHOT 依赖项需要您根据特定分支自行构建。

提交 Flink CDC Job

下面是同步整个数据库的示例文件mysql-to-doris.yaml:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
 type: mysql
 hostname: localhost
 port: 3306
 username: root
 password: 123456
 tables: app_db.\.*
 server-id: 5400-5404
 server-time-zone: UTC

sink:
 type: doris
 fenodes: 127.0.0.1:8030
 username: root
 password: ""

pipeline:
 name: Sync MySQL Database to Doris
 parallelism: 2

需要根据自己的需求修改配置文件。最后使用Cli将作业提交到Flink Standalone集群。

cd /path/flink-cdc-*
./bin/flink-cdc.sh mysql-to-doris.yaml

提交成功后返回信息如下:

Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris

然后你就可以通过 Flink Web UI 找到正在运行的名为 Sync MySQL Database to Doris 的作业。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部