一、Flink CDC、Flink、CDC各有啥关系

  Flink:流式计算框架,不包含 Flink CDC,和 Flink CDC没关系

  CDC:是一种思想,理念,不涉及某一门具体的技术。CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。

  Flink CDC:是 CDC 的一种实现而已,不属于 Flink 子版块。这个技术是阿里开发的。目的是为了丰富 Flink 的生态。

1.1 概述

  Flink CDC 基于数据库日志的 Change Data Caputre 技术,实现了全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。

1.2 和 jdbc Connectors 对比

  JDBC Connectors 连接器,确实可以读取外部的 数据库。比如:MySQL、Oracle、SqlServer等。但是,JDBC连数据库,只是瞬时操作,没办法持续监听数据库的数据变化。

  Flink CDC Connectors,可以实现数据库的变更捕获,能够持续不断地把变更数据同步到下游的系统中。

官网概述:https://ververica.github.io/flink-cdc-connectors/
github链接:https://github.com/ververica/flink-cdc-connectors

二、使用

  FlinkCDC 同步数据,有两种方式,一种是 FlinkSQL 的方式,一种是Flink DataStream 和 Table API 的方式。

  我这里直接用的是 ieda 测试的 DataStream 方式。

  代码来自:https://github.com/yclxiao/flink-cdc-demo/tree/main/src/main/java/com/yclxiao/flinkcdcdemo

  CloudAcctProfit2DwsHdjProfitRecordAPI.java

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.xiaoqiang.utils.JdbcUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.*;

public class CloudAcctProfit2DwsHdjProfitRecordAPI {
    private static final Logger LOG = LoggerFactory.getLogger(CloudAcctProfit2DwsHdjProfitRecordAPI.class);
    private static String MYSQL_HOST = "x.x.x.x";
    private static int MYSQL_PORT = 3306;
    private static String MYSQL_USER = "root";
    private static String MYSQL_PASSWD = "xiaoqiang";
    private static String SYNC_DB = "league_test";
    private static List<String> SYNC_TABLES = Arrays.asList("league_test.oc_settle_profit");

    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(MYSQL_HOST)
                .port(MYSQL_PORT)
                .databaseList(SYNC_DB) // set captured database
                .tableList(String.join(",", SYNC_TABLES)) // set captured table
                .username(MYSQL_USER)
                .password(MYSQL_PASSWD)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(5000);

        DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + "xiaoqiang-flink");

        List<String> tableList = getTableList();
        System.out.println("tableList--->"+tableList);
        for (String tbl : tableList) {
            SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, "oc_settle_profit");
//            SingleOutputStreamOperator<String> cleanStream = clean(filterStream);
            // 流的数据sink出去
            filterStream.addSink(new CustomDealDataSink())
                    .name("sink " + tbl);
        }
        env.execute("xiaoqiang-flink");
    }

    /**
     * 自定义sink
     */
    private static class CustomDealDataSink extends RichSinkFunction<String> {
        private transient Connection coalitiondbConnection;
        private transient Statement coalitiondbStatement;
        private transient Connection cloudConnection;
        private transient Statement cloudStatement;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // 在这里初始化 JDBC 连接
            coalitiondbConnection = DriverManager.getConnection("jdbc:mysql://x.x.x.x:3306/league_test", "root", "");
            coalitiondbStatement = coalitiondbConnection.createStatement();
            cloudConnection = DriverManager.getConnection("jdbc:mysql://x.x.x.x:3306/cloud_test", "root", "");
            cloudStatement = cloudConnection.createStatement();
        }

        @Override
        public void invoke(String value, Context context) throws Exception {
            // 解析拿到的CDC-JSON数据
            JSONObject rowJson = JSON.parseObject(value);
            String outNo = rowJson.getString("out_no");
            Integer userType = rowJson.getInteger("user_type");
            String id = rowJson.getString("id");
            String payOrderNo = rowJson.getString("pay_order_no");
            String title = rowJson.getString("title");
            String fromUserId = rowJson.getString("from_user_id");
            String fromAccountId = rowJson.getString("from_account_id");
            String userId = rowJson.getString("user_id");
            String accountId = rowJson.getString("account_id");
            Integer amount = rowJson.getInteger("amount");
            Integer profitState = rowJson.getInteger("profit_state");
            Date profitTime = rowJson.getTimestamp("profit_time");
            Integer refundState = rowJson.getInteger("refund_state");
            Date refundTime = rowJson.getTimestamp("refund_time");
            Date addTime = rowJson.getTimestamp("add_time");
            String remark = rowJson.getString("remark");
            String acctCircle = rowJson.getString("acct_circle");
            Integer fromUserType = rowJson.getInteger("from_user_type");
            String companyId = rowJson.getString("company_id");
            String bizCompanyId = rowJson.getString("biz_company_id");
//            if (1 != profitState || !"PG11111".equals(acctCircle)) {
//                return;
//            }
//
//            // 读取相关表的数据(与其他表进行关联)
//            Integer bizType = null;
//            String contributeUserId = null;
//            String relationBrandOwnerId = null;
//            ResultSet virtualOrderResultSet = coalitiondbStatement.executeQuery("select * from tc_virtual_order where order_type != 2 and id = '" + outNo + "'");
//            // 如果是tc_virtual_order订单(上岗卡、安心卡、课程)
//            if (virtualOrderResultSet.next()) {
//                // 处理数据逻辑
//                Integer virtualOrder4OrderType = virtualOrderResultSet.getInt("order_type");
//                String virtualOrder4CompanyId = virtualOrderResultSet.getString("company_id");
//                String virtualOrder4BrandId = virtualOrderResultSet.getString("brand_id");
//                // 上岗卡订单排掉,因为已经有别的任务处理了
//                if (virtualOrder4OrderType == 2) {
//                    return;
//                }
//                // orderType转换
//                if (virtualOrder4OrderType == 6) {
//                    bizType = 10;
//                } else if (virtualOrder4OrderType == 1) {
//                    bizType = 11;
//                } else if (virtualOrder4OrderType == 5) {
//                    bizType = 12;
//                }
//                // userType转换
//                if (virtualOrder4OrderType == 6 && userType == 92) {
//                    contributeUserId = virtualOrder4CompanyId;
//                } else if (virtualOrder4OrderType == 1 && userType == 92) {
//                    contributeUserId = virtualOrder4CompanyId;
//                } else if (virtualOrder4OrderType == 5 && userType == 92) {
//                    contributeUserId = virtualOrder4CompanyId;
//                }
//                // relationBrandOwnerId转换
//                if (virtualOrder4OrderType == 6 && userType == 90) {
//                    relationBrandOwnerId = virtualOrder4BrandId;
//                } else if (virtualOrder4OrderType == 1 && userType == 90) {
//                    relationBrandOwnerId = virtualOrder4BrandId;
//                } else if (virtualOrder4OrderType == 5 && userType == 90) {
//                    relationBrandOwnerId = virtualOrder4BrandId;
//                }
//                // remark转换
//                if (virtualOrder4OrderType == 1 || virtualOrder4OrderType == 5) {
//                    remark = title;
//                }
//            } else {
//                // 如果不是tc_virtual_order的数据,则可能是其他数据,此处只保留好到家实物商品数据
//                if (StringUtils.isBlank(payOrderNo)) {
//                    return;
//                }
//                ResultSet acctPayOrderResultSet = cloudStatement.executeQuery("select * from acct_pay_order t where t.id = '" + payOrderNo + "'");
//                if (!acctPayOrderResultSet.next()) {
//                    return;
//                }
//                Integer payCate = acctPayOrderResultSet.getInt("pay_cate");
//                if (200100 != payCate) { // 好到家实物商品类型
//                    return;
//                }
//
//                bizType = 20;
//                if (userType == 92 && StringUtils.isNotBlank(bizCompanyId)) {
//                    contributeUserId = bizCompanyId;
//                } else if (userType == 90 && StringUtils.isNotBlank(bizCompanyId)) {
//                    ResultSet brandOwnerIdResultSet = cloudStatement.executeQuery("select * from uc_brand_partner t where t.company_id = '" + bizCompanyId + "'");
//                    if (brandOwnerIdResultSet.next()) {
//                        relationBrandOwnerId = brandOwnerIdResultSet.getString("brand_owner_id");
//                    }
//                }
//            }
//            if (StringUtils.isBlank(remark)) {
//                remark = title;
//            }

            // 数据写入到mysql
            String insertSql = "INSERT INTO dws_profit_record_hdj_flink_api (id, show_profit_id, order_no, from_user_id, from_user_type, user_id,\n" +
                    "                                                    user_type, amount, profit_time, state, acct_circle, biz_type,\n" +
                    "                                                    contribute_user_id, relation_brand_owner_id, remark, add_time)\n" +
                    "VALUES ('" + id + "', '" + "JSD" + id + "', '" + outNo + "', '" + fromUserId + "', " + fromUserType + ", '" + userId + "', " + userType + ",\n" +
                    "        " + amount + ", '" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss", TimeZone.getTimeZone("GMT")) + "', " + profitState + ", '" + acctCircle + "', " + 1 + ", " + (StringUtils.isBlank("123") ? null : "'" + "contributeUserId" + "'") + ", " + (StringUtils.isBlank("relationBrandOwnerId") ? null : "'" + "relationBrandOwnerId" + "'") + ", '" + remark + "',\n" +
                    "        '" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss", TimeZone.getTimeZone("GMT")) + "');";
            cloudStatement.execute("delete from dws_profit_record_hdj_flink_api where id = '" + id + "'");
            System.out.println("insertSql--->"+insertSql);
            cloudStatement.execute(insertSql);
        }

        @Override
        public void close() throws Exception {
            super.close();
            // 在这里关闭 JDBC 连接
            coalitiondbStatement.close();
            coalitiondbConnection.close();
            cloudStatement.close();
            cloudConnection.close();
        }
    }

    /**
     * 清晰数据
     *
     * @param source
     * @return
     */
    private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {
        return source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String row, Collector<String> out) throws Exception {
                try {
                    LOG.info("============================row:{}", row);
                    JSONObject rowJson = JSON.parseObject(row);
                    String op = rowJson.getString("op");
                    //history,insert,update
                    if (Arrays.asList("r", "c", "u").contains(op)) {
                        out.collect(rowJson.getJSONObject("after").toJSONString());
                    } else {
                        LOG.info("filter other op:{}", op);
                    }
                } catch (Exception ex) {
                    LOG.warn("filter other format binlog:{}", row);
                }
            }
        });
    }

    /**
     * 过滤数据
     *
     * @param source
     * @param table
     * @return
     */
    private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {
        return source.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String row) throws Exception {
                try {
                    JSONObject rowJson = JSON.parseObject(row);
                    JSONObject source = rowJson.getJSONObject("source");
                    String tbl = source.getString("table");
                    return table.equals(tbl);
                } catch (Exception ex) {
                    ex.printStackTrace();
                    return false;
                }
            }
        });
    }

    private static List<String> getTableList() {
        List<String> tables = new ArrayList<>();
        String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";
        List<JSONObject> tableList = JdbcUtil.executeQuery(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);
        for (JSONObject jsob : tableList) {
            String schemaName = jsob.getString("TABLE_SCHEMA");
            String tblName = jsob.getString("TABLE_NAME");
            String schemaTbl = schemaName + "." + tblName;
            if (SYNC_TABLES.contains(schemaTbl)) {
                tables.add(tblName);
            }
        }
        return tables;
    }
}

  JdbcUtil.java

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class JdbcUtil {

    static {
        try {
//            Class.forName("com.mysql.cj.jdbc.Driver");
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);

    public static void main(String[] args) throws SQLException {
    }

    public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql) {
        List<JSONObject> beJson = new ArrayList<>();
        String connectionUrl = String.format("jdbc:mysql://%s:%s/league_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai", hostUrl, port);
        Connection con = null;
        try {
            con = DriverManager.getConnection(connectionUrl, user, password);
            PreparedStatement ps = con.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            beJson = resultSetToJson(rs);
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                con.close();
            } catch (Exception e) {
            }
        }
        return beJson;
    }

    private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException {
        List<JSONObject> list = new ArrayList<>();
        ResultSetMetaData metaData = rs.getMetaData();
        int columnCount = metaData.getColumnCount();
        while (rs.next()) {
            JSONObject jsonObj = new JSONObject();
            for (int i = 1; i <= columnCount; i++) {
                String columnName = metaData.getColumnLabel(i);
                String value = rs.getString(columnName);
                jsonObj.put(columnName, value);
            }
            list.add(jsonObj);
        }
        return list;
    }
}

  pom.xml:

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.4.0</version>
        </dependency>
2.1 Mysql 打开 bin-log 功能

  og_bin 的Value如果为ON代表开启,如果为OFF代表关闭,MySQL8.0默认是开启的:

# 查看是否开启binlog
mysql> SHOW VARIABLES LIKE '%log_bin%';

  关闭状态:

在这里插入图片描述

  • log_bin为ON代表MySQL已经开启binlog日志记录
  • log_bin_basename配置了binlog的文件路径及文件前缀名
  • log_bin_index配置了binlog索引文件的路径

  开启状态:

在这里插入图片描述

# 在centos中mysql的配置文件一般都在/etc/mysql目录下,如果不在可以通过 find / -name "my.cnf" 查找
vi /etc/mysql/my.cnf

# 服务ID
server-id=1
# binlog 配置 只要配置了log_bin地址 就会开启
log_bin = /var/lib/mysql/mysql_bin
# 日志存储天数 默认0 永久保存
# 如果数据库会定期归档,建议设置一个存储时间不需要一直存储binlog日志,理论上只需要存储归档之后的日志
expire_logs_days = 30
# binlog最大值
max_binlog_size = 1024M
# 规定binlog的格式,binlog有三种格式statement、row、mixad,默认使用statement,建议使用row格式
binlog_format = ROW
# 在提交n次事务后,进行binlog的落盘,0为不进行强行的刷新操作,而是由文件系统控制刷新日志文件,如果是在线交易和账有关的数据建议设置成1,如果是其他数据可以保持为0即可
sync_binlog = 1

# 重启MySQL服务使配置生效
systemctl restart mysqld / service mysql restart

# 查看日志列表
SHOW MASTER LOGS;

可参考:MySQL 开启配置binlog以及通过binlog恢复数据

2.2 在 Mysql 中建库建表准备
CREATE DATABASE IF NOT EXISTS cloud_test;
CREATE DATABASE IF NOT EXISTS league_test;

CREATE TABLE league_test.oc_settle_profit (
        id                           varchar(32),
        show_profit_id               varchar(32),
        order_no                     varchar(32),
        from_user_id                 varchar(32),
        from_user_type               int(11),
        user_id                      varchar(32),
        user_type                    int(11),
        rate                         int(11),
        amount                       int(11),
        type                         int(11),
        add_time                     datetime,
        state                        int(11),
        expect_profit_time           datetime,
        profit_time                  datetime,
        profit_mode                  int(11),
        opt_code                     varchar(32),
        opt_name                     varchar(32),
        acct_circle                  varchar(32),
        process_state                int(11),
        parent_id                    varchar(32),
        keep_account_from_user_id    varchar(32),
        keep_account_from_bm_user_id varchar(32),
        keep_account_user_id         varchar(32),
        keep_account_bm_user_id      varchar(32),
        biz_type                     int(11),
        remark                       varchar(32),
        contribute_user_id           varchar(32),
        relation_brand_owner_id      varchar(32),
        PRIMARY KEY (id) USING BTREE
);

CREATE TABLE cloud_test.dws_profit_record_hdj_flink_api (
        id                      varchar(32),
        show_profit_id          varchar(32),
        order_no                varchar(32),
        from_user_id            varchar(32),
        from_user_type          int(11),
        user_id                 varchar(32),
        user_type               int(11),
        amount                  int(11),
        profit_time             datetime,
        state                   int(11),
        acct_circle             varchar(32),
        biz_type                int(11),
        contribute_user_id      varchar(32),
        relation_brand_owner_id varchar(32),
        remark                  varchar(32),
        add_time                datetime,
        PRIMARY KEY (id) USING BTREE
        );
2.3 遇到的坑

  用 JDBC 连接 Mysql 的时候报错:The MySQL server has a timezone offset (0 seconds ahead of UTC)

  原因:从错误即可知道是时区的错误。

show variables like '%time_zone%';
Variable_name   |Value |
----------------+------+
time_zone       |SYSTEM|

// 或者下面这条命令
SELECT @@global.time_zone;

  解决:使用 root 用户登录 mysql,再执行 set global time_zone='+8:00' 命令。

  注意:一开始改成了 SET GLOBAL time_zone = 'Asia/Shanghai',但并不好使。

2.4 测试

  Idea 启动程序后,在 oc_settle_profit 表中插入数据后 dws_profit_record_hdj_flink_api 也可以同步插入相应的数据。

参考:
【博学谷学习记录】超强总结,用心分享|大数据之flinkCDC
一次打通FlinkCDC同步Mysql数据

三、番外

  用 Flink CDC 可以监控 Mysql,但无法监控 StarRocks,和官方询问过,目前 StarRocks 并没有像 Mysql 这样被外部感知 DDL 操作的 bin-log 功能,所以暂时还无法用 Flink CDC 监控 StarRocks。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部