工作中遇到这样的一个业务,业务方给的是一个视图,查了一下文档视图不能监听,这个时候想着要不要用datastream去自定义,然后发现flinksql也是可以实现
创建对应数据库和表

-- 创建班级表 tb_class
CREATE TABLE tb_class (
    class_id INT PRIMARY KEY AUTO_INCREMENT,  -- 主键
    class_name VARCHAR(50) NOT NULL           -- 班级名称
);

-- 创建学生表 tb_student
CREATE TABLE tb_student (
    student_id INT PRIMARY KEY AUTO_INCREMENT,  -- 主键
    student_name VARCHAR(50) NOT NULL,          -- 姓名
    class_id INT,                               -- 班级ID
    FOREIGN KEY (class_id) REFERENCES tb_class(class_id)  -- 外键关联到 tb_class
);
-- 向 tb_class 表中插入数据
INSERT INTO tb_class (class_name) VALUES ('Class A');
INSERT INTO tb_class (class_name) VALUES ('Class B');
INSERT INTO tb_class (class_name) VALUES ('Class C');

-- 向 tb_student 表中插入数据
INSERT INTO tb_student (student_name, class_id) VALUES ('Alice', 1);
INSERT INTO tb_student (student_name, class_id) VALUES ('Bob', 2);
INSERT INTO tb_student (student_name, class_id) VALUES ('Charlie', 3);
INSERT INTO tb_student (student_name, class_id) VALUES ('Diana', 1);

-- 创建视图 tb_student_view
CREATE VIEW tb_student_view AS
SELECT 
    s.student_id AS student_id,       -- 主键
    s.student_name AS student_name,   -- 学生姓名
    c.class_name AS class_name        -- 班级名称
FROM 
    tb_student s
JOIN 
    tb_class c ON s.class_id = c.class_id;
		
		select * from tb_student_view;

-- 创建 ads_student 表
CREATE TABLE ads_student (
    student_id INT PRIMARY KEY AUTO_INCREMENT,  -- 主键
    student_name VARCHAR(50) NOT NULL,          -- 学生姓名
    class_name VARCHAR(50) NOT NULL             -- 班级名称
);

编写对应的flinksql代码,这里没有flinksql客户端,只能在idea上完成了

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQLJoinExample {
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建 Table 环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        env.setParallelism(2);  // 将并行度设置为2,根据需要调整
        // 定义 tb_student 表
        tableEnv.executeSql(
                "CREATE TABLE tb_student (" +
                        "   student_id INT, " +
                        "   student_name STRING, " +
                        "   class_id INT ," +
                        "   PRIMARY KEY (student_id) NOT ENFORCED" +
                        ") WITH (" +
                        "   'connector' = 'mysql-cdc', " +  // 使用 datagen 连接器模拟数据,开发测试用
                        "    'hostname' = 'localhost'," +
                        "  'port' = '3307', " +
                        "   'username' = 'root', " +
                        "   'password' = '123456'," +
                        "    'database-name' = 't2', " +
                        "    'scan.incremental.snapshot.enabled' = 'false', " +

                        "   'table-name' = 'tb_student' " +
                        ")"
        );

        // 定义 tb_class 表
        tableEnv.executeSql(
                "CREATE TABLE tb_class (" +
                        "   class_id INT, " +
                        "   class_name STRING, " +
                        "   PRIMARY KEY (class_id) NOT ENFORCED" +
                        ") WITH (" +
                        "  'connector' = 'mysql-cdc', " +  // 使用 datagen 连接器模拟数据,开发测试用
                        "    'hostname' = 'localhost'," +
                        "  'port' = '3307', " +
                        "   'username' = 'root', " +
                        "   'password' = '123456'," +
                        "    'database-name' = 't2', " +
                        "    'scan.incremental.snapshot.enabled' = 'false', " +

                        "   'table-name' = 'tb_class' " +
                        ")"
        );


        // 定义 ads_student 表,使用 jdbc 连接器作为目标表
        tableEnv.executeSql(
                "CREATE TABLE ads_student (" +
                        "   student_id INT, " +
                        "   student_name STRING, " +
                        "   class_name STRING, " +
                        "   PRIMARY KEY (student_id) NOT ENFORCED" +
                        ") WITH (" +
                        "   'connector' = 'jdbc', " +
                        "   'url' = 'jdbc:mysql://localhost:3307/t2', " +
                        "   'table-name' = 'ads_student', " +
                        "   'username' = 'root', " +
                        "   'password' = '123456' " +
                        ")"
        );

        // 执行 JOIN 查询并插入到 ads_student 表
        tableEnv.executeSql(
                "INSERT INTO ads_student " +
                        "SELECT s.student_id, s.student_name, c.class_name " +
                        "FROM tb_student AS s " +
                        "JOIN tb_class AS c ON s.class_id = c.class_id"
        );

        // 启动任务
        env.execute("Flink SQL Join Example");
    }
}

注意要加上额外的pom依赖

# sink 使用的是jdbc连接的方式
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc</artifactId>
      <version>3.2.0-1.19</version>
    </dependency>


<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.15.0</version>
</dependency>

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部