进入
DoubleCloud
https://www.double.cloud
创建一个kafka
1 选择语言
2 运行curl 的url命令启动一个topic
3 生成对应语言的token
4 复制3中的配置文件到本地,命名为client.properties
5 复制客户端代码
对python和java客户端代码进行了重写,java改成了kotlin:
配置文件
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=
sasl.password=
group.id=
auto.offset.reset=earliest
# Best practice for higher availability in librdkafka clients prior to 1.7
session.timeout.ms=45000
import time
from confluent_kafka import Producer, Consumer
import asyncio
import threading
class KafkaClient:
def __init__(self, config_file):
self.config = self.read_config(config_file)
def read_config(self, config_file):
config = {}
with open(config_file) as fh:
for line in fh:
line = line.strip()
if len(line) != 0 and line[0] != "#":
parameter, value = line.strip().split('=', 1)
config[parameter] = value.strip()
return config
def produce(self, topic, key, value):
# Creates a new producer instance
producer = Producer(self.config)
# Produces a sample message
producer.produce(topic, key=key, value=value)
print(f"Produced message to topic {topic}: key = {key:12} value = {value:12}")
# Send any outstanding or buffered messages to the Kafka broker
producer.flush()
def consume_async(self, topic, callback=None, group_id="python-group-1", auto_offset_reset="earliest"):
# Sets the consumer group ID and offset
self.config["group.id"] = group_id
self.config["auto.offset.reset"] = auto_offset_reset
consumer = Consumer(self.config)
consumer.subscribe([topic])
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if callback is not None:
loop.run_until_complete(callback(consumer))
def consume(self, topic, callback=None):
thread = threading.Thread(target=self.consume_async, args=(topic, callback,))
thread.start()
return thread
async def consume_async(consumer):
try:
while True:
msg = consumer.poll(1.0)
if msg is not None:
break
if not msg.error():
key = msg.key().decode("utf-8")
value = msg.value().decode("utf-8")
print(f"Consumed message: key = {key:12} value = {value:12}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
config_file_path = ".\\client.properties"
topic = "test"
key = "key"
value = "value"
kafka_client = KafkaClient(config_file_path)
kafka_client.produce(topic, key, value)
thread = kafka_client.consume(topic, consume_async)
配置文件
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='GHFXZDIOMQW3IPKA' password='TimUk7hj/EwTiB031lA95LeKfXN3t2Ddnw+izhKx3+7wFxZKMLGEqTOnneTKrlQQ';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Best practice for higher availability in Apache Kafka clients prior to 3.0
session.timeout.ms=45000
topic=
group.id=
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Best practice for Kafka producer to prevent data loss
acks=all
java(kotiln)
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.io.Closeable
import java.io.FileInputStream
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Paths
import java.time.Duration
import java.util.*
class KafkaClient<T, V> : Closeable {
private var producer: KafkaProducer<T, V>? = null
private var fileConfig: Properties? = null
val TOPIC = "topic"
private val DURATION = 100L
private val POOLSIZE = 10
private val DISPATCHER = newFixedThreadPoolContext(POOLSIZE, "CoroutinePool")
private val SCOPE = CoroutineScope(DISPATCHER)
constructor(configPath: String? = null, config: Properties? = null) {
if (config == null && configPath == null) throw Exception("don't have any config")
var config1 = Properties()
if (configPath != null) {
fileConfig = readConfig(configPath)
fileConfig?.let { config1.putAll(it) }
}
if (config != null) {
config1.putAll(config)
}
producer = KafkaProducer(config1)
}
fun produce(key: T, value: V, topic: String? = null) {
producer?.send(ProducerRecord(topic ?: (fileConfig?.getProperty(TOPIC) as String), key, value))
}
fun consume(func: suspend (ConsumerRecords<T, V>) -> Unit) {
val consumer: KafkaConsumer<T, V> = KafkaConsumer(fileConfig)
consumer.subscribe(Arrays.asList(fileConfig?.getProperty(TOPIC)))
SCOPE.launch {
while (true) {
val records: ConsumerRecords<T, V> = consumer.poll(Duration.ofMillis(DURATION))
func(records)
delay(DURATION)
}
}
}
@Throws(IOException::class)
fun readConfig(configFile: String): Properties {
if (!Files.exists(Paths.get(configFile))) {
throw IOException("$configFile not found.")
}
val config = Properties()
FileInputStream(configFile).use { inputStream -> config.load(inputStream) }
return config
}
override fun close() {
producer?.close()
}
}
fun main() {
val cli =
KafkaClient<String, String>("D:\\src\\main\\java\\com\\tr\\robot\\io\\kafka\\client.properties")
cli.consume {
println("test beg")
for (record in it) {
println(
String.format(
"Consumed message from topic %s: key = %s value = %s", cli.TOPIC, record.key(), record.value()
)
)
}
println("test end")
}
// Give some time for the consumer to start
Thread.sleep(2000)
cli.produce("key1", "test")
// Give some time for the consumer to consume the message
Thread.sleep(5000)
}
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » java(kotlin)和 python 通过DoubleCloud的kafka进行线程间通信
发表评论 取消回复