文章目录
前言
提示:这里主要总结在工作中使用到的和遇到到的问题:Java flink版本1.15+
一、flink 写kafka
1.第一种使用FlinkKafkaProducer API
// 假设有一个DataStream<String> named text
DataStream<String> text = env.fromElements("Hello", "World", "Flink", "Kafka");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"my-topic", // 目标Kafka topic
new SimpleStringSchema(), // 序列化schema
props, // 生产者配置
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 语义保证
// 添加sink
text.addSink(myProducer);
此FlinkKafkaConsumer或FlinkKafkaProducer API 在flink1.15版本后,已经被弃用
。推出了新的消费kafka 的 API KafkaSource和KafkaSink。
2.第二种使用自定义序列化器
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
public class CustomKafkaSerializationSchema implements KafkaSerializationSchema<JSONObject> {
private static final long serialVersionUID = 8497940668660042203L;
private String topic;
public CustomKafkaSerializationSchema(final String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(final JSONObject element, final Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, element.toJSONString().getBytes());
}
}
在低版本的flink-connector-kafka中,不支持KafkaSerializationSchema
3.第三种使用FlinkKafkaProducer011 API
// 假设有一个DataStream<String>
DataStream<String> text = env.fromElements("Hello", "World", "Flink", "Kafka");
// Kafka 生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 使用 FlinkKafkaProducer011 写入 Kafka
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(
"my-topic", // 目标 Kafka topic
new SimpleStringSchema(), // 序列化 schema
props, // 生产者配置
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); // 语义保证
// 添加 sink
text.addSink(myProducer);
使用FlinkKafkaProducer011有问题,由于flink1.15+版本FlinkKafkaProducer
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » flink读写案例合集
发表评论 取消回复