docker-compose部署Flink及Dinky

服务器环境:centos7

1. 配置hosts

vim /etc/hosts

x.x.x.x jobmanager
x.x.x.x taskmanager
x.x.x.x dinky-mysql

2. 文件目录结构

.
├── conf
│   ├── JobManager
│   │   ├── flink-conf.yaml
│   │   ├── log4j-cli.properties
│   │   ├── log4j-console.properties
│   │   ├── log4j.properties
│   │   ├── log4j-session.properties
│   │   ├── logback-console.xml
│   │   ├── logback-session.xml
│   │   ├── logback.xml
│   │   ├── masters
│   │   ├── workers
│   │   └── zoo.cfg
│   └── TaskManager
│       ├── flink-conf.yaml
│       ├── log4j-cli.properties
│       ├── log4j-console.properties
│       ├── log4j.properties
│       ├── log4j-session.properties
│       ├── logback-console.xml
│       ├── logback-session.xml
│       ├── logback.xml
│       ├── masters
│       ├── workers
│       └── zoo.cfg
├── dinky
│   ├── config
│   │   └── application.yml
│   └── lib
├── dinky-mysql
│   ├── conf
│   │   └── my.cnf
│   └── data
├── docker-compose-flink-dinky.yml
├── jar
├── plugins
└── sql

说明:

(建议第一次启动容器时不映射配置文件及lib包路径,将默认的配置文件和lib包复制到宿主机后,第二次启动时再映射配置文件及lib包路径,方便后期功能扩展)

  1. flink/conf/JobManager/flink-conf.yaml - flink JobManager的配置文件
  2. flink/conf/TaskManager/flink-conf.yaml - flink TaskManager的配置文件
  3. flink/jar/ - JobManager和TaskManager的依赖包,映射容器内的/opt/flink/lib/ 目录。
  4. flink/dinky/config/application.yml - dinky的配置文件,可以配置连接的mysql库
  5. flink/dinky/lib/ - dinky的依赖包,映射容器内的/opt/dinky/lib/ 目录。
  6. flink/dinky-mysql/conf/my.cnf - dinky-mysql的配置文件,需配置 max_allowed_packet=1073741824,否则导入数据会失败。
  7. flink/dinky-mysql/data/ - dinky-mysql的数据映射目录

3. 创建容器

  1. docker-compose-flink-dinky.yml

    version: '3'
    services:
      jobmanager:
        image: flink:1.14.0-scala_2.12-java8
        container_name: jobmanager
        hostname: jobmanager
        expose: 
          - "6123"
        ports:
          - "48081:8081" # Flink Web UI
          - "6123:6123" # JobManager RPC
          - "6124:6124" # JobManager REST
          - "8082:8082" # Metrics
          - "6125:6125" # Blob Server
        command: jobmanager
        networks: 
          - flink-net
        restart: always
        volumes:
          - /opt/flink/conf/JobManager/:/opt/flink/conf/ #第一次启动不映射,将conf复制到宿主机后,第二次启动再映射
          - /opt/flink/jar/:/opt/flink/lib/ #第一次启动不映射,将lib复制到宿主机后,第二次启动再映射
          - /opt/flink/plugins/:/opt/flink/plugins/
          - /opt/flink/sql/:/opt/flink/sql/
    
      taskmanager:
        image: flink:1.14.0-scala_2.12-java8
        container_name: taskmanager
        hostname: taskmanager
        depends_on:
          - jobmanager
        scale: 1 # 可以根据需要增加 TaskManagers 的数量
        command: taskmanager
        networks:
          - flink-net
        restart: always
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
        volumes:
          - /opt/flink/conf/TaskManager/:/opt/flink/conf/ #第一次启动不映射,将conf复制到宿主机后,第二次启动再映射
          - /opt/flink/jar/:/opt/flink/lib/ #第一次启动不映射,将lib复制到宿主机后,第二次启动再映射
          - /opt/flink/plugins/:/opt/flink/plugins/
          - /opt/flink/sql/:/opt/flink/sql/
    
      dinky:
        image: dinkydocker/dinky-standalone-server:0.7.0-flink14
        container_name: dinky
        hostname: dinky
        ports:
          - "38081:8081"
          - "38888:8888"
        networks:
          - flink-net
        restart: always
        environment:
          - SPRING_PROFILES_ACTIVE=prod
          - MYSQL_HOST=dinky-mysql
          - MYSQL_PORT=33306
          - MYSQL_DATABASE=dlink
          - MYSQL_USERNAME=dlink
          - MYSQL_PASSWORD=dlink
        depends_on:
          - dinky-mysql
        volumes:
          - /opt/flink/dinky/config/application.yml:/opt/dinky/config/application.yml #第一次启动不映射,将application.yml复制到宿主机后,第二次启动再映射
          - /opt/flink/dinky/lib/:/opt/dinky/lib/ #第一次启动不映射,将lib复制到宿主机后,第二次启动再映射
    
      dinky-mysql: 
        image: dinkydocker/dinky-mysql-server:0.7.0
        container_name: dinky-mysql
        hostname: dinky-mysql
        ports:
          - "33306:3306"
        networks:
          - flink-net
        restart: always
        environment:
          - MYSQL_ROOT_PASSWORD=dlink
          - MYSQL_DATABASE=dlink
        volumes:
          - /opt/flink/dinky-mysql/data/:/var/lib/mysql/
          - /opt/flink/dinky-mysql/conf/:/etc/mysql/conf.d/
    
    networks:
      flink-net:
        driver: bridge
    
  2. 创建容器

    docker-compose -f docker-compose-flink-dinky.yml up -d
    

    如果创建容器时,报错 iptables: No chain/target/match by that name. 需开启防火墙(systemctl start firewalld)

  3. 查看容器状态

    如果容器都是up状态,但是web页面访问不到,注意一下两点:

    1. 防火墙是否关闭,或开放指定端口
    2. 进入docker 容器的logs目录下查看日志

4. web访问验证

  1. flink:http://ip:48081/
    在这里插入图片描述

  2. dinky:http://ip:38888/

    用户名/密码:admin/admin
    在这里插入图片描述

5. Dinky 配置及测试

5.1 配置flink实例

在 dinky --> 注册中心 --> 集群管理 --> Flink实例管理 中配置 flink的JobManager地址

在这里插入图片描述

5.2 执行sql

  1. 在测试数据库创建数据表user_source 和 target_source

  2. 在dinky创建sql并执行

    -- 创建源表映射语句
    DROP TABLE if exists user_source_1;
    CREATE TABLE  IF NOT EXISTS user_source_1 (
      `id` STRING,
      `name` STRING,
      `age` INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://ip:3306/test_database?characterEncoding=UTF-8&allowMultiQueries=true&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true',
      'table-name' = 'user_source',
      'username' = 'root',
      'password' = '123456'
    );
    
    -- 创建目标表映射语句
    DROP TABLE if exists user_target_1;
    CREATE TABLE  IF NOT EXISTS user_target_1 (
      `id` STRING,
      `name` STRING,
      `age` INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://ip:3306/test_database?characterEncoding=UTF-8&allowMultiQueries=true&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true',
      'table-name' = 'user_target',
      'username' = 'root',
      'password' = '123456'
    );
    
    -- 同步数据语句
    insert into user_target_1 select * from user_source_1;
    
  3. 配置执行模式,并执行sql。执行完成后在flink管理页面也可以看到执行的sql任务

    在这里插入图片描述

  4. 如果在执行sql过程中有下面报错,是因为缺少对应的jar包,需要将包传入dinky的lib目录

    Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
    at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:587)
    at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:561)
    at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:146)
    ... 112 more
    Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
    Available factory identifiers are:
    blackhole
    datagen
    filesystem
    print
    at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:399)
    at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:583)
    ... 114 more
    

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部