高级别问题
我在本地运行 kafka,并且正在使用压缩主题。当我运行命令行生产者和消费者时,我可以验证正在发生压缩,但是当我使用 sarama(“github.com/Shopify/sarama”)生产者时,似乎没有发生日志压缩。
验证日志压缩
首先,我使用以下命令创建了一个主题:
bin/kafka-topics.sh --zookeeper localhost:2181 \
--create --topic andrew.topic \
--config "cleanup.policy=compact" \
--config "delete.retention.ms=100" \
--config "segment.ms=100" \
--config "min.cleanable.dirty.ratio=0.01" \
--partitions 1 \
--replication-factor 1
接下来,我使用以下内容向它生成几条消息:
for i in $(seq 0 10); do \
echo "sameKey123:differentMessage$i" | bin/kafka-console-producer.sh \
--broker-list localhost:9091 \
--topic andrew.topic \
--property "parse.key=true" \
--property "key.separator=:"; \
done
最后验证是否发生了日志压缩:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 \
--topic andrew.topic \
--property print.key=true \
--property key.separator=" : " \
--from-beginning
哪个打印:
sameKey123 : differentMessage9
sameKey123 : differentMessage10
因此,正在对 andrew.topic 主题进行日志压缩。
现在使用萨拉马
现在我使用 sarama 为同一主题生成消息,如下所示:
package main
import (
"fmt"
"github.com/Shopify/sarama"
"os"
"os/signal"
)
func main() {
sendMessages()
}
func sendMessages() {
producer, err := sarama.NewSyncProducer([]string{"localhost:9091"}, nil)
if err != nil {
panic(err)
}
defer func() {
if err := producer.Close(); err != nil {
panic(err)
}
}()
for i := 0; i <= 10; i++ {
pm := &sarama.ProducerMessage{
Topic: "andrew.topic",
Key: sarama.StringEncoder("sameSaramaKey123"),
Value: sarama.StringEncoder(fmt.Sprintf("differentMessage%v", i)),
}
_, _, err := producer.SendMessage(pm)
if err != nil {
panic(err)
}
}
}
在命令行重新启动消费者后,我看到以下输出
sameKey123 : differentMessage9
sameKey123 : differentMessage10
sameSaramaKey123 : differentMessage0
sameSaramaKey123 : differentMessage1
sameSaramaKey123 : differentMessage2
sameSaramaKey123 : differentMessage3
sameSaramaKey123 : differentMessage4
sameSaramaKey123 : differentMessage5
sameSaramaKey123 : differentMessage6
sameSaramaKey123 : differentMessage7
sameSaramaKey123 : differentMessage8
sameSaramaKey123 : differentMessage9
sameSaramaKey123 : differentMessage10
此处未进行日志压缩。无论我重启消费者多少次,或者我使用 sarama 日志压缩生成多少消息,似乎都没有发生。
更怪异
如果在使用 sarama 生成消息之后,我会在命令行日志压缩中生成更多消息,然后发生
运行 sarama producer 后运行终端生产者后得到以下输出
sameSaramaKey123 : differentMessage10
sameKey123 : differentMessage9
sameKey123 : differentMessage10
在终端运行生产者后,所有消息都会发生日志压缩,包括之前由 sarama 生成的消息。
为什么会这样?我该如何解决?