前言: 

通过sinkTo()的优点:更简洁、类型安全,适用于使用 Flink 提供的预定义 sink 或简单的自定义 sink

准备:

 引入Flink 1.12版本即可

    <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-connector-kafka</artifactId>
         <version>3.2.0-1.19</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>1.19.1</version>
    </dependency>

创建任务:

package com.iterge.flink.job;


import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author iterge
 * @version 1.0
 * @date 2024/10/30 16:20
 * @description kafka to kafka
 */

@Slf4j
public class KafkaToKafkaDemo {

    static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    public static void main(String[] args) throws Exception {
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("it.erge.test.topic")
                .setGroupId("it.erge.test.topic.6")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> msg = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        //添加sink
        KafkaSink<String> build = KafkaSink.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("it.lph.test.topic")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();
        msg.sinkTo(build);
        env.execute();
    }
}

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部