一、目的
由于部分数据类型频率为1s,从而数据规模特别大,因此完整的JSON放在Hive中解析起来,尤其是在单机环境下,效率特别慢,无法满足业务需求。
而Flume的拦截器并不能很好的转换数据,因为只能采用Java方式,从Kafka的主题A中采集数据,并解析字段,然后写入到放在Kafka主题B中
二 、原始数据格式
JSON格式比较复杂,对象中包含数组,数组中包含对象
{
"deviceNo": "39",
"sourceDeviceType": null,
"sn": null,
"model": null,
"createTime": "2024-09-03 14:40:00",
"data": {
"cycle": 300,
"sectionList": [{
"sectionNo": 1,
"coilList": [{
"laneNo": 1,
"laneType": null,
"coilNo": 1,
"volumeSum": 3,
"volumePerson": 0,
"volumeCarNon": 0,
"volumeCarSmall": 3,
"volumeCarMiddle": 0,
"volumeCarBig": 0,
"speedAvg": 24.15,
"timeOccupancy": 0.98,
"averageHeadway": 162.36,
"averageGap": 161.63,
"speed85": 38.0
},
{
"laneNo": 8,
"laneType": null,
"coilNo": 8,
"volumeSum": 1,
"volumePerson": 0,
"volumeCarNon": 0,
"volumeCarSmall": 1,
"volumeCarMiddle": 0,
"volumeCarBig": 0,
"speedAvg": 49.43,
"timeOccupancy": 0.3,
"averageHeadway": 115.3,
"averageGap": 115.1,
"speed85": 49.43
}]
}]
}
}
三、Java代码
package com.kgc; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaKafkaStatistics { // 添加 Kafka Producer 配置 private static Properties producerProps() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.ACKS_CONFIG, "-1"); props.put(ProducerConfig.RETRIES_CONFIG, "3"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); props.put(ProducerConfig.LINGER_MS_CONFIG, "1"); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); return props; } public static void main(String[] args) { Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 每一个消费,都要定义不同的Group_ID prop.put(ConsumerConfig.GROUP_ID_CONFIG, "statistics_group"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("topic_internal_data_statistics")); ObjectMapper mapper = new ObjectMapper(); // 初始化 Kafka Producer KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps()); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { try { JsonNode rootNode = mapper.readTree(record.value()); System.out.println("原始数据"+rootNode); String device_no = rootNode.get("deviceNo").asText(); String source_device_type = rootNode.get("sourceDeviceType").asText(); String sn = rootNode.get("sn").asText(); String model = rootNode.get("model").asText(); String create_time = rootNode.get("createTime").asText(); JsonNode dataNode = rootNode.get("data"); String cycle = dataNode.get("cycle").asText(); for (JsonNode sectionStatus : dataNode.get("sectionList")) { String section_no = sectionStatus.get("sectionNo").asText(); JsonNode coilList = sectionStatus.get("coilList"); for (JsonNode coilItem : coilList) { String lane_no = coilItem.get("laneNo").asText(); String lane_type = coilItem.get("laneType").asText(); String coil_no = coilItem.get("coilNo").asText(); String volume_sum = coilItem.get("volumeSum").asText(); String volume_person = coilItem.get("volumePerson").asText(); String volume_car_non = coilItem.get("volumeCarNon").asText(); String volume_car_small = coilItem.get("volumeCarSmall").asText(); String volume_car_middle = coilItem.get("volumeCarMiddle").asText(); String volume_car_big = coilItem.get("volumeCarBig").asText(); String speed_avg = coilItem.get("speedAvg").asText(); String speed_85 = coilItem.get("speed85").asText(); String time_occupancy = coilItem.get("timeOccupancy").asText(); String average_headway = coilItem.get("averageHeadway").asText(); String average_gap = coilItem.get("averageGap").asText(); String outputLine = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s", device_no, source_device_type, sn, model, create_time, cycle, lane_no, lane_type, section_no, coil_no, volume_sum, volume_person, volume_car_non, volume_car_small, volume_car_middle, volume_car_big, speed_avg, speed_85, time_occupancy, average_headway, average_gap); // 发送数据到 Kafka ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic_db_data_statistics", record.key(), outputLine); producer.send(producerRecord, (RecordMetadata metadata, Exception e) -> { if (e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } }); } } } catch (Exception e) { e.printStackTrace(); } } consumer.commitAsync(); } } }
剩下的几步参考二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON)这篇博客
四、开启Kafka主题消费者
五、运行测试
六、ODS层新表结构
七、Flume采集配置文件
八、运行Flume任务,检查HDFS文件、以及ODS表数据
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » 二百六十、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(复杂JSON)
发表评论 取消回复