网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。

一、下载github.com/segmentio/kafka-go包

go get github.com/segmentio/kafka-go

二、建立kafka连接

正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9092"    //host 具体看你们自己的配置如果是服务器上的 就是服务器iP:9092 本地就是localhost:9092
const topic = "my"
const partition = 0

/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaConn() (*kafka.Conn, error) {
	return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}

三、kafka之发送消息(生产者)

/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {
	Name string `json:"name"`
	Pwd  string `json:"pwd"`
}


// writeByConn 基于Conn发送消息
func writeByConn() {

	// 连接至Kafka集群的Leader节点
	conn, err := NewKafKaCon()
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	// 设置发送消息的超时时间
	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
	people1 := People{"Tmo",
		"124"}
	people2 := People{"Mac",
		"124"}
	people3 := People{"Joker",
		"124"}
	// 发送消息
	str1, _ := json.Marshal(people1)
	str2, _ := json.Marshal(people2)
	str3, _ := json.Marshal(people3)
	_, err = conn.WriteMessages(
		kafka.Message{Value: []byte(str1)},
		kafka.Message{Value: []byte(str2)},
		kafka.Message{Value: []byte(str3)},
	)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}

四、kafka之接收消息(消费者)

// readByConn 连接至kafka后接收消息
func readByConn() {
	// 指定要连接的topic和partition

	// 连接至Kafka的leader节点
	conn, err := NewKafKaCon()
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}
	// 设置读取超时时间
	conn.SetReadDeadline(time.Now().Add(10 * time.Second))
	// 读取一批消息,得到的batch是一系列消息的迭代器
	batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

	// 遍历读取消息
	b := make([]byte, 10e3) // 10KB max per message
	for {
		p := People{}
		n, err := batch.Read(b)
		if err != nil {
			break
		}
		err = json.Unmarshal(b[:n], &p)
		if err != nil {
			fmt.Println(string(b))
			fmt.Println(err, "**************")
			continue
		}
		fmt.Println(p)
	}
	// 关闭batch
	if err := batch.Close(); err != nil {
		log.Fatal("failed to close batch:", err)
	}
	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close connection:", err)
	}
}

完整代码

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/segmentio/kafka-go"
	"log"
	"time"
)

/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {
	Name string `json:"name"`
	Pwd  string `json:"pwd"`
}

/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0

/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {
	return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}

func main() {
	writeByConn()
	readByConn()
	
}

// writeByConn 基于Conn发送消息
func writeByConn() {

	// 连接至Kafka集群的Leader节点
	conn, err := NewKafKaCon()
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	// 设置发送消息的超时时间
	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
	people1 := People{"Tmo",
		"124"}
	people2 := People{"Mac",
		"124"}
	people3 := People{"Joker",
		"124"}
	// 发送消息
	str1, _ := json.Marshal(people1)
	str2, _ := json.Marshal(people2)
	str3, _ := json.Marshal(people3)
	_, err = conn.WriteMessages(
		kafka.Message{Value: []byte(str1)},
		kafka.Message{Value: []byte(str2)},
		kafka.Message{Value: []byte(str3)},
	)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}

// readByConn 连接至kafka后接收消息
func readByConn() {
	// 指定要连接的topic和partition

	// 连接至Kafka的leader节点
	conn, err := NewKafKaCon()
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}
	// 设置读取超时时间
	conn.SetReadDeadline(time.Now().Add(10 * time.Second))
	// 读取一批消息,得到的batch是一系列消息的迭代器
	batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

	// 遍历读取消息
	b := make([]byte, 10e3) // 10KB max per message
	for {
		p := People{}
		n, err := batch.Read(b)
		if err != nil {
			break
		}
		err = json.Unmarshal(b[:n], &p)
		if err != nil {
			fmt.Println(string(b))
			fmt.Println(err, "**************")
			continue
		}
		fmt.Println(p)
	}
	// 关闭batch
	if err := batch.Close(); err != nil {
		log.Fatal("failed to close batch:", err)
	}
	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close connection:", err)
	}
}

五、kafka之消费者组实现消息确认(从一次消费消息的末尾开始接收消息)

只需要给读取消息的方法改变一下就可以了


func readByConn() {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{host},
		GroupID:  "consumer-group-id",
		Topic:    topic,
		MaxBytes: 10e6, // 10MB
	})

	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			break
		}
		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
	}
}

完整代码

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/segmentio/kafka-go"
	"log"
	"time"
)

/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {
	Name string `json:"name"`
	Pwd  string `json:"pwd"`
}

/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0

/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {
	return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}

func main() {
	writeByConn()
	readByConn()
}

// writeByConn 基于Conn发送消息
func writeByConn() {

	// 连接至Kafka集群的Leader节点
	conn, err := NewKafKaCon()
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	// 设置发送消息的超时时间
	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
	people1 := People{"Tmo",
		"124"}
	people2 := People{"Mac",
		"124"}
	people3 := People{"Joker",
		"124"}
	// 发送消息
	str1, _ := json.Marshal(people1)
	str2, _ := json.Marshal(people2)
	str3, _ := json.Marshal(people3)
	_, err = conn.WriteMessages(
		kafka.Message{Value: []byte(str1)},
		kafka.Message{Value: []byte(str2)},
		kafka.Message{Value: []byte(str3)},
	)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}



func readByConn() {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{host},
		GroupID:  "consumer-group-id",
		Topic:    topic,
		MaxBytes: 10e6, // 10MB
	})

	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			break
		}
		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
	}
}

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部