一、组件

1. JobManager

作业管理器是一个 Flink 集群中任务管理和调度的核心,是控制应用执行的主进程

1.1 JobMaster
  • JobMaster 是 JobManager 中最核心的组件,负责处理单独的作业(Job)。JobMaster 和具体的 Job 是一一对应的,多个 Job 可以同时运行在一个 Flink 集群中, 每个 Job 都有一个自己的 JobMaster
  • 在作业提交时,JobMaster 会先接收到要执行的应用,即客户端提交来的 Jar 包、数据流图 (dataflow graph) 和作业图 (JobGraph);然后 JobMaster 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫作执行图 (ExecutionGraph),它包含了所有可以并发执行的任务;接着 JobMaster 会向资源管理器 (ResourceManager) 发出请求,申请执行任务必要的资源,一旦获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager 上。而在运行过程中, JobMaster 会负责所有需要中央协调的操作,比如检查点 (checkpoints) 的协调
1.2 ResourceManager

资源管理器

  • ResourceManager 主要负责资源的分配和管理,在 Flink 集群中只有一个
  • 资源是指 TaskManager 的任务槽(task slots)。任务槽就是 Flink 集群中的资源调配单元,包含了机器用来执行计算的一组 CPU 和内存资源。每一个任务(Task)都需要分配到一个 slot 上执行
  • Flink 的 ResourceManager,针对不同的环境和资源管理平台有不同的具体实现
    • 在 Standalone 部署时,因为 TaskManager 是单独启动的(没有 Per-Job 模式),所以 ResourceManager 只能分发可用 TaskManager 的任务槽,不能单独启动新 TaskManager
    • 在有资源管理平台 (如 Yarn) 时,当新的作业申请资源,ResourceManager 会将有空闲槽位的 TaskManager 分配给 JobMaster。如果 ResourceManager 没有足够的任务槽,它可以向资源提供平台发起会话,请求提供启动新 TaskManager 进程的容器。另外,ResourceManager 还负责停掉空闲的 TaskManager,释放计算资源
1.3 Dispatcher

分发器

  • Dispatcher 主要负责提供一个 REST 接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的 JobMaster 组件
  • Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息
  • Dispatcher 在架构中并不是必需的,在不同的部署模式下可能会被忽略掉

2. TaskManager

任务管理器

  • TaskManager 是 Flink 中的工作进程,负责数据流的具体计算,所以也被称为 Worker。Flink 集群中必须至少有一个 TaskManager;在分布式系统中,通常会有多个 TaskManager 运行,每一个 TaskManager 都包含了一定数量的任务槽(task slots)
  • Slot 是资源调度的最小单位,slot 的数量限制了 TaskManager 能够并行处理的任务数量。启动之后,TaskManager 会向 ResourceManager 注册它的 slots;收到 ResourceManager 的指令后,TaskManager 就会将一个或者多个槽位提供给 JobMaster 调用, JobMaster 就可以分配任务来执行了
  • 在执行过程中,TaskManager 可以缓冲数据,还可以跟其他运行同一应用的 TaskManager 交换数据

二、任务提交流程

1. 整体抽象流程

在这里插入图片描述

  • 一般情况下,由客户端(App)通过分发器提供的 REST 接口,将作业提交给 JobManager
  • 由分发器启动 JobMaster,并将作业(包含 JobGraph)提交给 JobMaster
  • JobMaster 将 JobGraph 解析为可执行的 ExecutionGraph,得到所需的资源数量,然后向资源管理器请求资源(slots)
  • 资源管理器判断当前是否由足够的可用资源;如果没有,启动新的 TaskManager
  • TaskManager 启动之后,向 ResourceManager 注册自己的可用任务槽(slots)
  • 资源管理器通知 TaskManager 为新的作业提供 slots
  • TaskManager 连接到对应的 JobMaster,提供 slots
  • JobMaster 将需要执行的任务分发给 TaskManager
  • TaskManager 执行任务,互相之间可以交换数据

2. Flink on Yarn 任务提交流程

2.1 会话模式任务提交流程

在这里插入图片描述

  • 先启动一个 YARN session,启动 JobManager,此时只有 ResourceManager 和 Dispatcher 在运行
  • 客户端将 flink jar 包和相关配置上传到 HDFS
  • 客户端通过 REST 接口,将作业提交给分发器
  • 分发器启动 JobMaster,并将作业(包含 JobGraph)提交给 JobMaster
  • JobMaster 向资源管理器请求资源(slots)
  • 资源管理器向 YARN 的资源管理器请求 container 资源
  • YARN 启动新的 TaskManager 容器
  • TaskManager 启动之后,向 Flink 的资源管理器注册自己的可用任务槽
  • 资源管理器通知 TaskManager 为新的作业提供 slots
  • TaskManager 连接到对应的 JobMaster,提供 slots
  • JobMaster 将需要执行的任务分发给 TaskManager,执行任务
2.2 单作业模式任务提交流程

在这里插入图片描述

  • 客户端将作业提交给 YARN 的资源管理器,这一步中会同时将 Flink 的 Jar 包和配置上传到 HDFS,以便后续启动 Flink 相关组件的容器
  • YARN 的资源管理器分配 Container 资源,启动 Flink JobManager,并将作业提交给JobMaster。这里省略了 Dispatcher 组件
  • JobMaster 向资源管理器请求资源(slots)
  • 资源管理器向 YARN 的资源管理器请求 container 资源
  • YARN 启动新的 TaskManager 容器
  • TaskManager 启动之后,向 Flink 的资源管理器注册自己的可用任务槽
  • 资源管理器通知 TaskManager 为新的作业提供 slots
  • TaskManager 连接到对应的 JobMaster,提供 slots
  • JobMaster 将需要执行的任务分发给 TaskManager,执行任务

三、任务调度原理

1. 整体调度过程

在这里插入图片描述

  • Flink 代码在被提交执行后首先经过优化器和图生成器会生成数据流图
  • Flink Client 的 ActorSystem 创建 Actor 将数据流图发送给 JobManager 中的 Actor
  • JobManager 会不断接收 TaskManager 的心跳消息,从而可以获取到有效的 TaskManager
  • JobManager 通过调度器在 TaskManager 中调度执行 Task (Task 对应一个线程)
  • 在程序运行过程中,Task 与 Task 之间可以进行数据传输
1.1 Job Client
  • 主要职责是提交任务, 提交后可以结束进程, 也可以等待结果返回
  • Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点
  • Job Client 负责接受用户的程序代码,然后创建数据流,将数据流提交给 Job Manager 以便进一步执行。执行完成后,Job Client 将结果返回给用户
1.2 JobManager
  • 主要职责是调度工作并协调任务做检查点
  • 集群中至少要有一个 master,master 负责调度 task,协调checkpoints 和容错
  • 高可用设置的话可以有多个 master,但要保证一个是 leader,其他是standby
  • JobManager 包含 Actor System、Scheduler、CheckPoint 三个重要的组件
  • JobManager 从客户端接收到任务以后, 首先生成优化过的执行计划, 再调度到 TaskManager 中执行
1.3 TaskManager
  • 主要职责是从 JobManager 处接收任务, 并部署和启动任务, 接收上游的数据并处理
  • TaskManager 是在 JVM 中的一个或多个线程中执行任务的工作节点
  • TaskManager 在创建之初就设置好了 Slot, 每个 Slot 可以执行一个任务

2. 相关概念

2.1 数据流图

Dataflow Graph,Flink 程序中所有算子按照逻辑顺序连接在一起的一张图,由 Source、Transformation、Sink 三部分组成,以一个或多个 Source 开始以一个或多个 Sink 结束,类似 Spark 的 DAG

在这里插入图片描述

  • Source:数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、RabbitMQ 等
  • Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select
  • Sink:接收器,Flink 将转换计算后的数据发送的地点 ,Flink 常见的 Sink 有:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等
2.2 并行子任务与并行度
  • 并行子任务 (Subtask):一个算子操作可以 “复制” 成多份分布到不同的节点去运行,每个节点所运行的任务称为该算子的一个并行子任务

  • 并行度 (Parallelism):

    • 针对数据流图中的每一步操作而言,一个算子操作的并行子任务个数称之为它的并行度 (Parallelism)
    • 针对整个数据流图而言,它的所有算子操作中的最大并行度称之为整个 Stream 的并行度
      在这里插入图片描述
  • 并行度的设置:

    • Flink 代码中设置:

      //Flink环境对象调用setParallelism(n)方法设置整个程序全局的并行度
      StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2)
      
      //每个算子操作调用setParallelism(n)方法设置当前算子的并行度
      dataStream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
      
    • 提交 Flink 应用时设置

      #命令行使用 -p 设置并行度
      bin/flink run –p 2
      
      #WebUI中在提交应用的配置框中填写并行度
      
      
    • 配置文件中设置,对整个集群生效

      vim flink-conf.yaml
      
      parallelism.default: 2
      
2.3 算子链
  • 不同算子之间的数据传输方式:

    • 一对一(One-to-one,forwarding):类似 Spark 的窄依赖,从上游向下游进行数据传输不会改变数据的分区和顺序。例如:source、map、 filter、 flatMap 等算子之间的数据传输
    • 重分区(Redistributing):类似 Spark 的宽依赖和 Shuffle 过程 (重分区好比发牌,shuffle好比洗牌),上游的数据会根据不同的策略 (基于 key hash 值、broadcast、rebalance轮询以及完全随机) 传输到不同的下游中,会造成数据分区和顺序的改变。例如:map 和 keyBy 之间的数据传输、window 和 Sink 之间的数据传输
  • 算子链 (Operator Chain):并行度相同、同一个 slot 共享组且数据传输方式为 one-to-one 的算子们可以合并成为一个算子链,形成一个 Task 由一个线程执行

  • 设置:

    //全局禁用算子链
    env.disableOperatorChaining();
    
    //禁用算子链
    .map(word -> Tuple2.of(word, 1L)).disableChaining();
    
    //从当前算子开始新链
    .map(word -> Tuple2.of(word, 1L)).startNewChain()
    
2.4 Task Slot 和槽共享
  • Flink 的每一个任务 (Task) 需要一个线程来执行;TaskManager 是一个 JVM 进程,在其中可以启动多个独立线程来执行任务

  • 为了控制一个 TaskManager 能接收多少个 Task,通过 Task Slot 对每个任务运行所占用的资源
    做出明确的划分,一个 TaskManager 至少有一个 Task Slot

  • Task Slot:在 TaskManager 上拥有计算资源的一个固定大小的子集,一个 TaskManager 上的所有 Task Slot 会均分整个内存,所以任务之间不受影响

  • Task Slot 配置:

    vim flink-conf.yaml
    
    taskmanager.numberOfTaskSlots: 8
    
    #由于slot之间不会涉及 CPU 的隔离,所以可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争
    
  • 槽共享:默认情况下,同一个作业的不同任务节点的子任务可以在同一个 Task Slot 上执行,实现槽共享。但同一个任务节点的并行子任务必须独立占据一个 Task Slot 执行
    在这里插入图片描述

  • 通过设置 “slot 共享组” (SlotSharingGroup) 可以让某个算子对应的任务完全独占一个 slot

    //共享组名称自定义,不设置则与前一个算子同属一个共享组,默认是default
    .map(word -> Tuple2.of(word, 1L)).slotSharingGroup("1"); 
    
    //此时,整个作业总共需要的 slot 数量,就是各个 slot 共享组最大并行度的总和
    
  • 并行度与 Task Slot:并行度是程序运行时实际使用的并发线程资源;Task Slot 是整个 TaskManager 总共可用的并发线程资源

2.5 执行流程图转换
  • Flink 中执行流程图转换可以分为:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图
  • StreamGraph:逻辑流图或数据流图,它是根据用户通过 Stream API 编写的代码生成的最初的执行图,用来表示程序的拓扑结构
  • JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化是将多个符合条件的节点 chain 在一起作为一个节点
  • ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的 “图”,并不是一个具体的数据结构

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部