我们观察到 Java Kafka Producer 0.9 客户端在发送小消息时性能非常差。消息不会累积到更大的请求批次中,因此每个小记录都是单独发送的。
我们的客户端配置有什么问题?还是这是其他问题?
使用 Kafka 客户端 0.9.0.0。我们在 Kafka 未发布的 9.0.1 或 9.1 固定或未解决列表中没有看到任何相关的帖子,因此我们专注于我们的客户端配置和服务器实例。
我们理解 linger.ms 应该导致客户端将记录累积到一个批次中。
我们将 linger.ms 设置为 10(也尝试了 100 和 1000),但这些并没有导致批量累积记录。对于大约 100 字节的记录大小和 16K 的请求缓冲区大小,我们预计在单个请求中发送大约 160 条消息。
尽管已经分配了一个新的 Bluemix Messaging Hub (Kafka Server 0.9) 服务实例,但客户端的跟踪似乎表明该分区可能已满。测试客户端在没有其他 I/O 的情况下循环发送多条消息。
日志显示带有可疑行的重复序列:“ Wake up the sender since topic mytopic partition 0 is full or getting a new batch ”。
因此,在我们的测试用例中,新分配的分区本质上应该是空的,那么为什么生产者客户端会得到一个新批次呢?
2015-12-10 15:14:41,335 3677 [main] TRACE com.isllc.client.producer.ExploreProducer - 发送记录:Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00011 2015-12-10T15:14:41.335-05:00'
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - 发送记录 ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af, value=[B @4923ab24 回调 null 到主题 mytopic 分区 0
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator - 为主题 mytopic 分区 0 分配一个新的 16384 字节消息缓冲区
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - 由于主题 mytopic 分区 0 已满或获得新批次,因此唤醒发件人
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExplorerProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - 准备发送数据的节点:[Node(0, kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)]
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - 创建了 1 个生产请求:[ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, request= RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0, record_set=java.nio.HeapByteBuffer[pos=0 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)]
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - 从节点 0 接收到相关 ID 为 11 的生产响应
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExplorerProducer] TRACE org.apache.kafka.clients.producer.internals.RecordBatch - 向主题分区 mytopic-0 生成消息,基本偏移量为 130,错误:null。
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - 发送返回的元数据:Topic='mytopic',Partition=0,Offset=130
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - 发送记录:Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00012 2015-12-10T15:14:41.412-05:00'
对于发送的每条记录,日志条目都像上面一样重复
我们提供了以下属性文件:
2015-12-10 15:14:37,843 185 [main] INFO com.isllc.client.AbstractClient - 从 Kafka 客户端文件中检索到的属性:kafka-producer.properties 2015-12-10 15:14:37,909 251 [主要] 信息 com.isllc.client.AbstractClient-acks=-1 2015-12-10 15:14:37,909 251 [主要] 信息 com.isllc.client.AbstractClient - ssl.protocol=TLSv1.2 2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient - key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - client.id=ExploreProducer 2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - ssl.truststore.identification.algorithm=HTTPS 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - ssl.truststore.password=changeit 2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - ssl.truststore.type=JKS 2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - ssl.enabled.protocols=TLSv1.2 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib /安全/cacerts 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub .services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05 -prod01.messagehub.services.us-south.bluemix.net:9094 2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient-security.protocol=SASL_SSL 另外,我们在代码中添加了 linger.ms=10。
Kafka 客户端显示扩展/合并的配置列表(并显示 linger.ms 设置):
2015-12-10 15:14:37,970 312 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig 值:
压缩类型=无
metric.reporters = []
元数据.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
重新连接.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, kafka03-prod01.messagehub.services.us -south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094]
重试.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
缓冲区.内存 = 33554432
超时.ms = 30000
key.serializer = 类 org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 60000
sasl.kerberos.min.time.before.relogin = 60000
连接数.max.idle.ms = 540000
ssl.truststore.password = [隐藏]
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id = 探索生产者
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLSv1.2
request.timeout.ms = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2]
确认 = -1
批量大小 = 16384
ssl.keystore.location = null
接收缓冲区字节 = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = SASL_SSL
重试 = 0
最大请求大小 = 1048576
value.serializer = 类 org.apache.kafka.common.serialization.ByteArraySerializer
ssl.truststore.location = /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
指标.sample.window.ms = 30000
partitioner.class = 类 org.apache.kafka.clients.producer.internals.DefaultPartitioner
发送缓冲区字节 = 131072
逗留.ms = 10
发送 100 条记录后的 Kafka 指标:
100 次发送的持续时间为 8787 毫秒。发送了 7687 个字节。
batch-size-avg = 109.87 [每个请求每个分区发送的平均字节数。]
batch-size-max = 110.0 [每个请求每个分区发送的最大字节数。]
buffer-available-bytes = 3.3554432E7 [未使用的缓冲内存总量(未分配或在空闲列表中)。]
buffer-exhausted-rate = 0.0 [由于缓冲区耗尽而丢弃的平均每秒记录发送数]
buffer-total-bytes = 3.3554432E7 [客户端可以使用的最大缓冲内存量(无论当前是否使用)。]
bufferpool-wait-ratio = 0.0 [appender 等待空间分配的时间分数。]
字节率 = 291.8348916277093 []
压缩率 = 0.0 []
compression-rate-avg = 0.0 [记录批次的平均压缩率。]
connection-close-rate = 0.0 [窗口中每秒关闭的连接数。]
connection-count = 2.0 [当前活动连接数。]
connection-creation-rate = 0.05180541884681138 [窗口中每秒建立的新连接。]
传入字节率 = 10.342564641029007 []
io-ratio = 0.0038877559207471236 [I/O 线程花费在 I/O 上的时间分数]
io-time-ns-avg = 353749.2840375587 [每次选择调用的 I/O 平均时间长度,以纳秒为单位。]
io-wait-ratio = 0.21531227995769162 [I/O 线程等待的时间分数。]
io-wait-time-ns-avg = 1.9591901192488264E7 [I/O 线程等待套接字准备好读取或写入的平均时间,以纳秒为单位。]
metadata-age = 8.096 [当前使用的生产者元数据的年龄,以秒为单位。]
network-io-rate = 5.2937784999213795 [每秒所有连接上的平均网络操作(读取或写入)数。]
传出字节率 = 451.2298783403283 []
generate-throttle-time-avg = 0.0 [平均油门时间,以毫秒为单位]
producer-throttle-time-max = 0.0 [最大油门时间,以毫秒为单位]
record-error-rate = 0.0 [导致错误的平均每秒记录发送数]
record-queue-time-avg = 15.5 [在记录累加器中花费的平均记录批次的毫秒时间。]
record-queue-time-max = 434.0 [在记录累加器中花费的最大记录批次时间,以毫秒为单位。]
记录重试率 = 0.0 []
record-send-rate = 2.65611304417116 [每秒发送的平均记录数。]
record-size-avg = 97.87 [平均记录大小]
record-size-max = 98.0 [最大记录大小]
records-per-request-avg = 1.0 [每个请求的平均记录数。]
request-latency-avg = 0.0 [平均请求延迟,以毫秒为单位]
请求延迟最大值 = 74.0 []
request-rate = 2.6468892499606897 [每秒发送的平均请求数。]
request-size-avg = 42.0 [窗口中所有请求的平均大小..]
request-size-max = 170.0 [在窗口中发送的任何请求的最大大小。]
requests-in-flight = 0.0 [等待响应的当前正在进行的请求数。]
response-rate = 2.651196976060479 [每秒收到的平均响应数。]
select-rate = 10.989861465830819 [I/O 层每秒检查新 I/O 执行的次数]
waiting-threads = 0.0 [阻塞等待缓冲区内存入队记录的用户线程数]
谢谢