亦菲、彦祖们,今天使用idea开发的时候,运行flink程序(读取kafka主题数据)的时候,发现操作台什么数据都没有只有满屏红色日志输出,关键干嘛?一点报错都没有,一开始我觉得应该执行程序的姿势有问题,然后我重新执行了一次还是不行,我就一直等待,发现等了好久都没有数据来到,我就开始察觉不对了。

        下面是我排查的思路:

        1.kafka broker有没有数据:因为我是读取kafka主题数据,所以我屁颠屁颠的去kakfa查看我的消费主题是否有数据,查看没有问题!

        2.读取的主题是否出现问题:经过切换其他主题读取数据,发现也是没有数据出现在操作台,所以不是主题的问题

        3.查看flink与kafka 连接器的配置是否有问题:我就回去查看构建kafka连接器的builder是否问题,我尝试把偏移量改为从最早的偏移量开始读取,也是无动于衷呀!

        通过以上思路之后,我就彻底无语了,那到底是什么问题?

因为我是从flink连接kafka读取数据的,所以我觉得直接连接kafka读取主题数据试一试,这样就可以排除是不是flink有问题了,所以我就写了以下代码进行测试:

 // 配置 Kafka 消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  "hadoop101:10092,hadoop102:10092,hadoop103:10092"); // Kafka 集群地址
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组 ID
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Key 反序列化器
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Value 反序列化器
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的偏移量开始读取

        // 创建消费者、
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("AllData_topic_ods"));
        //拉取超时时间
        ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(6000));
        for (ConsumerRecord<String, String> record : poll) {
            System.out.printf("offset = %d,key= %s.value=%s%n",record.offset(),record.key(),record.value());
        }
        consumer.close();

         一开始当拉取超时时间为100ms的时候,我也是消费不到数据的,但是我就想是不是我拉取超时时间太短了,因为我网络io和电脑性能匹配不上的话,它拉取时间是需要进行网络io的。所以我尝试修改拉取时间为6000ms,就是6s啦!
        然后突然就消费到数据了,我了个豆,搞定了我感觉我已经!

        100ms

        6000ms 

        于是我就回去把我那个builder的参数也修改了,一执行flink程序,这次不负众望,成功消费到数据了!!!

return KafkaSource.<String>builder()
                .setProperty("max.poll.interval.ms","10000") // 设置拉取超时时间为10s
                .setProperty("partition.discovery.interval.ms", "10000")
                .setProperty("commit.offsets.on.checkpoint", "true")
                .setProperty("isolation.level", "read_committed")//read_committed 只会读取事务型成功提交事务写入的消息;  read_uncommitted 默认值,能够读取到 Kafka 写入的任何消息
                .setBootstrapServers(bootstrapServers)
                .setTopics(topicName)
                .setGroupId(groupId)
                .setClientIdPrefix(clientIdPrefix)
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));

        亦菲、彦祖们,搞定了!如果不是这个问题的话,也参照我上面的排查思路看看是哪里出现了问题!我能解决也是一个一个排查到,给点耐心。 

        如果帮到你,恭喜呀!如果解决不了,那当我没说,你去看别人的文章吧! 



 感谢各位的观看,创作不易,能不能给哥们来一个点赞呢!!!

好了,今天的分享就这么多了,有什么不清楚或者我写错的地方,请多多指教!

私信,评论我呗!!!!!! 

关注我下一篇不迷路哦!

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部