我正在基于 Debian 3.1-buster 的 docker compose 堆栈中运行 Xunit 功能测试,Confluent Kafka .NET Client v1.5.3 连接到代理confluentinc/cp-kafka:6.0.1。我对卡夫卡还很陌生......
架构如下图所示:
我正在使用 xUnit 进行测试,并且有一个类夹具,它在测试集合/类的生命周期内启动一个进程内通用 Kestrel 主机。我正在使用进程内通用主机,因为我有一个使用 websockets 的附加 signalR 服务。据我了解,WebApplicationFactory
仅在内存中,不使用网络套接字。
通用主机包含一个 Kafka 生产者和消费者。生产者是使用该Produce
方法生产的单例服务。消费者使用取消令牌BackgroundService
运行Consume
循环(请参见下面的进一步清单)。消费者具有以下配置:
- 启用自动提交:真
- EnableAutoOffsetStore: 假
- AutoOffsetReset:AutoOffsetReset.Latest
它是具有 3 个分区的单个消费者。group.initial.rebalance.delay配置为 1000ms 。
测试生成一个线程,该线程发送事件以触发生产者将数据发布到 Kafka 主题。然后测试会等待一个时间延迟,ManualResetEvent
以便让消费者有时间处理主题数据。
消费者的问题是阻塞
当我在 docker-compose 环境中运行测试时,我可以从日志(包括在下面)中看到:
- 生产者和消费者连接到同一个代理和主题
- 生产者将数据发送到主题,但消费者正在阻塞
xUnit 和进程内 Kestrel 主机在与 kafka 服务相同的网络中的 docker-compose 服务中运行。Kafka 生产者能够成功地将数据发布到 kafka 主题,如下面的日志所示。
我创建了一个额外的 docker-compose 服务,它运行一个 python 客户端消费者。这使用轮询循环来使用在运行测试时发布的数据。数据由 Python 客户端使用。
有没有人知道为什么消费者会在这个环境中阻止以帮助查找故障?xUnit 测试中执行的等待会阻止由 xUnit 固定装置启动的进程内 Kestrel 主机吗?
如果我在 MacOS Catalina 10.15.7 上本地运行 Kestrel 主机,在 docker-compose 中连接到 Kafka(图像:lensio:fast-data-dev-2.5.1-L0),它会成功生成和使用。
更新 - 使用 lensio 图像工作 的本地 docker-compose 使用 docker image for lensio:fast-data-dev-2.5.1-L0。这使用 Apache Kafka 2.5.1 和 Confluent 组件 5.5.1。我也试过:
- 降级到 Confluent Kafka 图像 5.5.1
- 将 .Net Confluent 客户端升级到 1.5.3
结果保持不变,生产者生产良好,但消费者阻止。
lensio:fast-data-dev-2.5.1-LO配置和会导致阻塞的confluent/cp图像有什么区别?
我已将工作中的 docker-compose 配置标记到此查询的末尾。
更新 - 当 group.initial.rebalance.delay 为 0ms 时适用于 confluent/cp-kafka 图像
最初group.initial.rebalance.delay
是 1ms,与lensio:fast-data-dev-2.5.1-LO图像相同。confluent/cp-kafka 图像上的 1ms 设置表现出阻塞行为。
如果我将其更改group.initial.rebalance.delay
为 0ms,则 confluent/cp-kafka 图像不会发生阻塞。
与confluent-kafka-dotnet 客户端一起使用时, lensio:fast-data-dev-2.5.1-LO映像是否在docker -compose 开发环境中提供更好的性能?
测试
[Fact]
public async Task MotionDetectionEvent_Processes_Data()
{
var m = new ManualResetEvent(false);
// spawn a thread to publish a message and wait for 14 seconds
var thread = new Thread(async () =>
{
await _Fixture.Host.MqttClient.PublishAsync(_Fixture.Data.Message);
// allow time for kafka to consume event and process
Console.WriteLine($"TEST THREAD IS WAITING FOR 14 SECONDS");
await Task.Run(() => Task.Delay(14000));
Console.WriteLine($"TEST THREAD IS COMPLETED WAITING FOR 14 SECONDS");
m.Set();
});
thread.Start();
// wait for the thread to have completed
await Task.Run(() => { m.WaitOne(); });
// TO DO, ASSERT DATA AVAILABLE ON S3 STORAGE ETC.
}
测试输出 - 生产者已在主题上生成数据,但消费者尚未消费
Test generic host example
SettingsFile::GetConfigMetaData ::: Directory for executing assembly :: /Users/simon/Development/Dotnet/CamFrontEnd/Tests/Temp/WebApp.Test.Host/bin/Debug/netcoreapp3.1
SettingsFile::GetConfigMetaData ::: Executing assembly :: WebApp.Testing.Utils, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null
AutofacTestHost is using settings file /Users/simon/Development/Dotnet/CamFrontEnd/Tests/Temp/WebApp.Test.Host/bin/Debug/netcoreapp3.1/appsettings.Local.json
info: WebApp.Mqtt.MqttService[0]
Mqtt Settings :: mqtt://mqtt:*********@localhost:1883
info: WebApp.Mqtt.MqttService[0]
Mqtt Topic :: shinobi/+/+/trigger
info: WebApp.S3.S3Service[0]
Minio client created for endpoint localhost:9000
info: WebApp.S3.S3Service[0]
minio://accesskey:12345678abcdefgh@localhost:9000
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
Starting async initialization
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
Starting async initialization for WebApp.Kafka.Admin.KafkaAdminService
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Admin service trying to create Kafka Topic...
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Topic::eventbus, ReplicationCount::1, PartitionCount::3
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Bootstrap Servers::localhost:9092
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Admin service successfully created topic eventbus
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Kafka Consumer thread started
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
Async initialization for WebApp.Kafka.Admin.KafkaAdminService completed
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
Async initialization completed
info: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[0]
User profile is available. Using '/Users/simon/.aspnet/DataProtection-Keys' as key repository; keys will not be encrypted at rest.
info: WebApp.Kafka.ProducerService[0]
ProducerService constructor called
info: WebApp.Kafka.SchemaRegistry.Serdes.JsonDeserializer[0]
Kafka Json Deserializer Constructed
info: WebApp.Kafka.ConsumerService[0]
Kafka consumer listening to camera topics =>
info: WebApp.Kafka.ConsumerService[0]
Camera Topic :: shinobi/RHSsYfiV6Z/xi5cncrNK6/trigger
info: WebApp.Kafka.ConsumerService[0]
Camera Topic :: shinobi/group/monitor/trigger
%7|1607790673.462|INIT|rdkafka#consumer-3| [thrd:app]: librdkafka v1.5.3 (0x10503ff) rdkafka#consumer-3 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING CC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000)
info: WebApp.Kafka.ConsumerService[0]
Kafka consumer created => Name :: rdkafka#consumer-3
%7|1607790673.509|SUBSCRIBE|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": subscribe to new subscription of 1 topics (join state init)
%7|1607790673.509|REBALANCE|rdkafka#consumer-3| [thrd:main]: Group "consumer-group" is rebalancing in state init (join-state init) without assignment: unsubscribe
info: WebApp.Kafka.ConsumerService[0]
Kafka consumer has subscribed to topic eventbus
info: WebApp.Kafka.ConsumerService[0]
Kafka is waiting to consume...
info: WebApp.Mqtt.MqttService[0]
MQTT managed client connected
info: Microsoft.Hosting.Lifetime[0]
Now listening on: http://127.0.0.1:65212
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Production
info: Microsoft.Hosting.Lifetime[0]
Content root path: /Users/simon/Development/Dotnet/CamFrontEnd/Tests/Temp/WebApp.Test.Host/bin/Debug/netcoreapp3.1/
MQTT HAS PUBLISHED...SPAWNING TEST THREAD TO WAIT
TEST THREAD IS WAITING FOR 14 SECONDS
info: WebApp.S3.S3Service[0]
Loading json into JSON DOM and updating 'img' property with key 2d8e2438-e674-4d71-94ac-e54df0143a29
info: WebApp.S3.S3Service[0]
Extracting UTF8 bytes from base64
info: WebApp.S3.S3Service[0]
Updated JSON payload with img: 2d8e2438-e674-4d71-94ac-e54df0143a29, now uploading 1.3053922653198242 MB to S3 storage
%7|1607790674.478|JOIN|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": postponing join until up-to-date metadata is available
%7|1607790674.483|REJOIN|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": subscription updated from metadata change: rejoining group
%7|1607790674.483|REBALANCE|rdkafka#consumer-3| [thrd:main]: Group "consumer-group" is rebalancing in state up (join-state init) without assignment: group rejoin
%7|1607790674.483|JOIN|rdkafka#consumer-3| [thrd:main]: 127.0.0.1:9092/1: Joining group "consumer-group" with 1 subscribed topic(s)
%7|1607790674.541|JOIN|rdkafka#consumer-3| [thrd:main]: 127.0.0.1:9092/1: Joining group "consumer-group" with 1 subscribed topic(s)
info: WebApp.S3.S3Service[0]
Converting modified payload back to UTF8 bytes for Kafka processing
info: WebApp.Kafka.ProducerService[0]
Produce topic : eventbus, key : shinobi/group/monitor/trigger, value : System.Byte[]
info: WebApp.Kafka.ProducerService[0]
Delivered message to eventbus [[2]] @0
%7|1607790675.573|ASSIGNOR|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": "range" assignor run for 1 member(s)
%7|1607790675.588|ASSIGN|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": new assignment of 3 partition(s) in join state wait-sync
%7|1607790675.588|OFFSET|rdkafka#consumer-3| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 3/3 partition(s)
%7|1607790675.717|FETCH|rdkafka#consumer-3| [thrd:main]: Partition eventbus [0] start fetching at offset 0
%7|1607790675.719|FETCH|rdkafka#consumer-3| [thrd:main]: Partition eventbus [1] start fetching at offset 0
%7|1607790675.720|FETCH|rdkafka#consumer-3| [thrd:main]: Partition eventbus [2] start fetching at offset 1
** EXPECT SOME CONSUMER DATA HERE - INSTEAD IT IS BLOCKING WITH confluent/cp-kafka image **
TEST THREAD IS COMPLETED WAITING FOR 14 SECONDS
Timer Elapsed
Shutting down generic host
info: Microsoft.Hosting.Lifetime[0]
Application is shutting down...
info: WebApp.Mqtt.MqttService[0]
Mqtt managed client disconnected
info: WebApp.Kafka.ConsumerService[0]
The Kafka consumer thread has been cancelled
info: WebApp.Kafka.ConsumerService[0]
Kafka Consumer background service disposing
%7|1607790688.191|CLOSE|rdkafka#consumer-3| [thrd:app]: Closing consumer
%7|1607790688.191|CLOSE|rdkafka#consumer-3| [thrd:app]: Waiting for close events
%7|1607790688.191|REBALANCE|rdkafka#consumer-3| [thrd:main]: Group "consumer-group" is rebalancing in state up (join-state started) with assignment: unsubscribe
%7|1607790688.191|UNASSIGN|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": unassigning 3 partition(s) (v5)
%7|1607790688.191|LEAVE|rdkafka#consumer-3| [thrd:main]: 127.0.0.1:9092/1: Leaving group
%7|1607790688.201|CLOSE|rdkafka#consumer-3| [thrd:app]: Consumer closed
%7|1607790688.201|DESTROY|rdkafka#consumer-3| [thrd:app]: Terminating instance (destroy flags NoConsumerClose (0x8))
%7|1607790688.201|CLOSE|rdkafka#consumer-3| [thrd:app]: Closing consumer
%7|1607790688.201|CLOSE|rdkafka#consumer-3| [thrd:app]: Disabling and purging temporary queue to quench close events
%7|1607790688.201|CLOSE|rdkafka#consumer-3| [thrd:app]: Consumer closed
%7|1607790688.201|DESTROY|rdkafka#consumer-3| [thrd:main]: Destroy internal
%7|1607790688.201|DESTROY|rdkafka#consumer-3| [thrd:main]: Removing all topics
info: WebApp.Mqtt.MqttService[0]
Disposing Mqtt Client
info: WebApp.Kafka.ProducerService[0]
Flushing remaining messages to produce...
info: WebApp.Kafka.ProducerService[0]
Disposing Kafka producer...
info: WebApp.S3.S3Service[0]
Disposing of resources
Stopping...
卡夫卡消费者
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.AspNetCore.SignalR;
using WebApp.Data;
using WebApp.Kafka.Config;
using WebApp.Realtime.SignalR;
namespace WebApp.Kafka
{
public delegate IConsumer<string, MotionDetection> ConsumerFactory(
KafkaConfig config,
IAsyncDeserializer<MotionDetection> serializer
);
public class ConsumerService : BackgroundService, IDisposable
{
private KafkaConfig _config;
private readonly IConsumer<string, MotionDetection> _kafkaConsumer;
private ILogger<ConsumerService> _logger;
private IHubContext<MotionHub, IMotion> _messagerHubContext;
private IAsyncDeserializer<MotionDetection> _serializer { get; }
public ConsumerFactory _factory { get; set; }
// Using SignalR with background services:
// https://docs.microsoft.com/en-us/aspnet/core/signalr/background-services?view=aspnetcore-2.2
public ConsumerService(
IOptions<KafkaConfig> config,
ConsumerFactory factory,
IHubContext<MotionHub, IMotion> messagerHubContext,
IAsyncDeserializer<MotionDetection> serializer,
ILogger<ConsumerService> logger
)
{
if (config is null)
throw new ArgumentNullException(nameof(config));
_config = config.Value;
_factory = factory ?? throw new ArgumentNullException(nameof(factory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_messagerHubContext = messagerHubContext ?? throw new ArgumentNullException(nameof(messagerHubContext));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
// enforced configuration
_config.Consumer.EnableAutoCommit = true; // allow consumer to autocommit offsets
_config.Consumer.EnableAutoOffsetStore = false; // allow control over which offsets stored
_config.Consumer.AutoOffsetReset = AutoOffsetReset.Latest; // if no offsets committed for topic for consumer group, default to latest
_config.Consumer.Debug = "consumer";
_logger.LogInformation("Kafka consumer listening to camera topics =>");
foreach (var topic in _config.MqttCameraTopics) { _logger.LogInformation($"Camera Topic :: {topic}"); }
_kafkaConsumer = _factory(_config, _serializer);
_logger.LogInformation($"Kafka consumer created => Name :: {_kafkaConsumer.Name}");
}
protected override Task ExecuteAsync(CancellationToken cancellationToken)
{
new Thread(() => StartConsumerLoop(cancellationToken)).Start();
return Task.CompletedTask;
}
private void StartConsumerLoop(CancellationToken cancellationToken)
{
_kafkaConsumer.Subscribe(_config.Topic.Name);
_logger.LogInformation($"Kafka consumer has subscribed to topic {_config.Topic.Name}");
while (!cancellationToken.IsCancellationRequested)
{
try
{
_logger.LogInformation("Kafka is waiting to consume...");
var consumerResult = _kafkaConsumer.Consume(cancellationToken);
_logger.LogInformation("Kafka Consumer consumed message => {}", consumerResult.Message.Value);
if (_config.MqttCameraTopics.Contains(consumerResult.Message.Key))
{
// we need to consider here security for auth, only want for user
// await _messagerHubContext.Clients.All.ReceiveMotionDetection(consumerResult.Message.Value);
_logger.LogInformation("Kafka Consumer dispatched message to SignalR");
// instruct background thread to commit this offset
_kafkaConsumer.StoreOffset(consumerResult);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("The Kafka consumer thread has been cancelled");
break;
}
catch (ConsumeException ce)
{
_logger.LogError($"Consume error: {ce.Error.Reason}");
if (ce.Error.IsFatal)
{
// https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#fatal-consumer-errors
_logger.LogError(ce, ce.Message);
break;
}
}
catch (Exception e)
{
_logger.LogError(e, $"Unexpected exception while consuming motion detection {e}");
break;
}
}
}
public override void Dispose()
{
_logger.LogInformation("Kafka Consumer background service disposing");
_kafkaConsumer.Close();
_kafkaConsumer.Dispose();
base.Dispose();
}
}
}
Kestrel 主机配置
/// <summary>
/// Build the server, with Autofac IOC.
/// </summary>
protected override IHost BuildServer(HostBuilder builder)
{
// build the host instance
return new HostBuilder()
.UseServiceProviderFactory(new AutofacServiceProviderFactory())
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddConsole();
logging.AddFilter("Microsoft.AspNetCore.SignalR", LogLevel.Information);
})
.ConfigureWebHost(webBuilder =>
{
webBuilder.ConfigureAppConfiguration((context, cb) =>
{
cb.AddJsonFile(ConfigMetaData.SettingsFile, optional: false)
.AddEnvironmentVariables();
})
.ConfigureServices(services =>
{
services.AddHttpClient();
})
.UseStartup<TStartup>()
.UseKestrel()
.UseUrls("http://127.0.0.1:0");
}).Build();
}
码头工人组成堆栈
---
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- camnet
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: WARN
kafka:
image: confluentinc/cp-kafka:6.0.1
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
networks:
- camnet
ports:
- "9092:9092"
- "19092:19092"
environment:
CONFLUENT_METRICS_ENABLE: 'false'
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_BROKER_ID: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 1000
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
KAFKA_LOG4J_LOGGERS: "org.apache.zookeeper=WARN,org.apache.kafka=WARN,kafka=WARN,kafka.cluster=WARN,kafka.controller=WARN,kafka.coordinator=WARN,kafka.log=WARN,kafka.server=WARN,kafka.zookeeper=WARN,state.change.logger=WARN"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
mqtt:
container_name: mqtt
image: eclipse-mosquitto:1.6.9
ports:
- "8883:8883"
- "1883:1883"
- "9901:9001"
environment:
- MOSQUITTO_USERNAME=${MQTT_USER}
- MOSQUITTO_PASSWORD=${MQTT_PASSWORD}
networks:
- camnet
volumes:
- ./Mqtt/Config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- ./Mqtt/Certs/localCA.crt:/mosquitto/config/ca.crt
- ./Mqtt/Certs/server.crt:/mosquitto/config/server.crt
- ./Mqtt/Certs/server.key:/mosquitto/config/server.key
minio:
container_name: service-minio
image: dcs3spp/minio:version-1.0.2
ports:
- "127.0.0.1:9000:9000"
environment:
- MINIO_BUCKET=images
- MINIO_ACCESS_KEY=${MINIO_USER}
- MINIO_SECRET_KEY=${MINIO_PASSWORD}
networks:
- camnet
networks:
camnet:
适用于 lensio:fast-data-dev 图像。为什么?
version: "3.8"
services:
kafka:
image: lensesio/fast-data-dev:2.5.1-L0
container_name: kafka
networks:
- camnet
ports:
- 2181:2181 # zookeeper
- 3030:3030 # ui
- 9092:9092 # broker
- 8081:8081 # schema registry
- 8082:8082 # rest proxy
- 8083:8083 # kafka connect
environment:
- ADV_HOST=127.0.0.1
- SAMPLEDATA=0
- REST_PORT=8082
- FORWARDLOGS=0
- RUNTESTS=0
- DISABLE_JMX=1
- CONNECTORS=${CONNECTOR}
- WEB_PORT=3030
- DISABLE=hive-1.1
mqtt:
container_name: mqtt
image: eclipse-mosquitto:1.6.9
ports:
- "8883:8883"
- "1883:1883"
- "9901:9001"
environment:
- MOSQUITTO_USERNAME=${MQTT_USER}
- MOSQUITTO_PASSWORD=${MQTT_PASSWORD}
networks:
- camnet
volumes:
- ./Mqtt/Config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- ./Mqtt/Certs/localCA.crt:/mosquitto/config/ca.crt
- ./Mqtt/Certs/server.crt:/mosquitto/config/server.crt
- ./Mqtt/Certs/server.key:/mosquitto/config/server.key
minio:
container_name: service-minio
image: dcs3spp/minio:version-1.0.2
ports:
- "127.0.0.1:9000:9000"
environment:
- MINIO_BUCKET=images
- MINIO_ACCESS_KEY=${MINIO_USER}
- MINIO_SECRET_KEY=${MINIO_PASSWORD}
networks:
- camnet
networks:
camnet: