我对 Go Lang 非常陌生,并试图对使用 Sarama 库从 Kafka 消费消息的开源库进行一些调整。原始代码可以在这里找到。
原始包实现了一个 PartitionConsumer,如果不需要跨多个消费者使用同一主题的读取一致性,它就可以正常工作,但是,这对我不起作用。
我在同一个应用程序中做了一些工作,使用我在网上找到的一些示例来实现 sarama NewConsumerGroup 包。
以下是我目前正在运行的代码:
package main
import (
"context"
// "flag"
"os"
"os/signal"
"sync"
"syscall"
"encoding/json"
"log"
"strings"
"github.com/Shopify/sarama"
// "github.com/Shopify/sarama/mocks"
)
// KafkaInput is used for recieving Kafka messages and
// transforming them into HTTP payloads.
type KafkaInput struct {
config *KafkaConfig
// consumers []sarama.PartitionConsumer
messages chan *sarama.ConsumerMessage
}
var (
brokers = ""
version = ""
group = ""
topics = ""
assignor = ""
oldest = true
verbose = false
)
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}
// NewKafkaInput creates instance of kafka consumer client.
func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {
/**
* Construct a new Sarama configuration.
* The Kafka cluster version has to be defined before the consumer/producer is initialized.
*/
c := sarama.NewConfig()
// Configuration options go here
log.Println("Starting a new Sarama consumer")
if verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
version, err := sarama.ParseKafkaVersion("2.1.1")
if err != nil {
log.Panicf("Error parsing Kafka version: %v", err)
}
c.Version = version
if oldest {
c.Consumer.Offsets.Initial = sarama.OffsetOldest
}
/**
* Setup a new Sarama consumer group
*/
consumer := Consumer{ready: make(chan bool)}
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(config.host, ","), config.group, c)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := client.Consume(ctx, []string{config.topic}, &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
i := &KafkaInput{
config: config,
// consumers: make([]sarama.PartitionConsumer, len(partitions)),
// messages: make(chan *sarama.ConsumerMessage, 256),
messages: make(chan *sarama.ConsumerMessage, 256),
}
return i
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
}
return nil
}
// ErrorHandler should receive errors
func (i *KafkaInput) ErrorHandler(consumer sarama.PartitionConsumer) {
for err := range consumer.Errors() {
log.Println("Failed to read access log entry:", err)
}
}
// Read Comment
func (i *KafkaInput) Read(data []byte) (int, error) {
message := <-i.messages
if !i.config.useJSON {
copy(data, message.Value)
return len(message.Value), nil
}
var kafkaMessage KafkaMessage
json.Unmarshal(message.Value, &kafkaMessage)
buf, err := kafkaMessage.Dump()
if err != nil {
log.Println("Failed to decode access log entry:", err)
return 0, err
}
copy(data, buf)
return len(buf), nil
}
func (i *KafkaInput) String() string {
return "Kafka Input: " + i.config.host + "/" + i.config.topic
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
KafkaConfig 为消费者携带 groupID 和 Topic。当我运行这个程序时,消费者启动并使用正确的组从正确的主题中读取,并使用在此函数中创建的 ConsumerClaim 将其打印到 STDOUT:
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
}
return nil
}
然而,我认为我需要的是NewKafkaInput
函数返回*KafkaInput
添加到结构的声明中的消息(如果我在这里使用了错误的术语,请原谅我,这是我的第一个 Go 牛仔竞技表演)。
...
i := &KafkaInput{
config: config,
// consumers: make([]sarama.PartitionConsumer, len(partitions)),
// messages: make(chan *sarama.ConsumerMessage, 256),
messages: make(chan *sarama.ConsumerMessage, 256),
}
return i
}
在此处完成的原始示例中:
func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {
...
go func(consumer sarama.PartitionConsumer) {
defer consumer.Close()
for message := range consumer.Messages() {
i.messages <- message
}
}(consumer)
...
}
我花了好几天的时间来研究将函数移入和移出NewKafakInput
函数,尝试将消息添加到KafakInput
函数外部的结构以及介于两者之间的所有内容。我只是无法让它工作。该NewKafakInput
函数需要返回*KafkaInput
任何消息,以便此函数可以完成:
func (i *KafkaInput) Read(data []byte) (int, error) {
message := <-i.messages
if !i.config.useJSON {
copy(data, message.Value)
return len(message.Value), nil
}
var kafkaMessage KafkaMessage
json.Unmarshal(message.Value, &kafkaMessage)
buf, err := kafkaMessage.Dump()
if err != nil {
log.Println("Failed to decode access log entry:", err)
return 0, err
}
copy(data, buf)
return len(buf), nil
}
完全有可能我也把这件事弄得一团糟,但感谢任何帮助和输入。
谢谢