2

我正在尝试使用compression.type = gzip 镜像主题,但消息到达目标集群时没有压缩。我正在使用 kafka 连接器来运行 MM2。

我曾尝试过这些设置但没有成功:

compression.type = gzip

producer.compression.type = gzip

target.compression.type = gzip

我正在使用以下命令检查目标集群中的消息:

/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /path_to_log/00000000000000000000.log --print-data-log | grep compresscodec

MM 2 有人遇到过这个问题吗?或者任何提示?

4

1 回答 1

0

我能够打开 GZIP 压缩到 MM2 生产者到目标集群(Cluster2)。我不知道这是否是唯一且正确的方法,但我使用这样的配置:

#########################################################################
# specify any number of cluster aliases
clusters=Cluster1, Cluster2

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
Cluster1.bootstrap.servers=server1.mynetwork.net:9095
Cluster2.bootstrap.servers=server2.mynetwork.net:9093

# enable and configure individual replication flows
Cluster1->Cluster2.enabled=true

# regex which defines which topics gets replicated. For eg "foo-.*"
Cluster1->Cluster2.topics=.*

# Reverse
Cluster2->Cluster1.enabled=false
Cluster2->Cluster1.topics=.*

# Workers configs (Cluster2)
Cluster2.producer.compression.type=gzip
#########################################################################

在 MM2 日志中,我们可以看到这 2 个生产者开启了 GZIP 压缩选项:

[2021-01-27 14:52:19,006] INFO ProducerConfig values:
        acks = -1
        batch.size = 16384
        bootstrap.servers = [server2.mynetwork.net:9093]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id = **connector-producer-MirrorSourceConnector-0**
        compression.type = gzip
.
.
.
--
[2021-01-27 14:52:19,023] INFO ProducerConfig values:
        acks = -1
        batch.size = 16384
        bootstrap.servers = [server2.mynetwork.net:9093]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id = connector-producer-MirrorHeartbeatConnector-0
        compression.type = gzip
.
.
.

转储我的日志段,我得到了这个:

baseOffset: 15190 lastOffset: 15190 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1473759 CreateTime: 1611769946191 size: 110 magic: 2 compresscodec: GZIP crc: 1457322770 isvalid: true
| offset: 15190 CreateTime: 1611769946191 keysize: 20 valuesize: 10 sequence: -1 headerKeys: [] key:ClusterCluster1 payload: wD��O
于 2021-01-27T18:44:26.810 回答