我将 MassTransit 7.1.4 版与 .Net Core 3.1、ASP.NET Core Web 应用程序一起使用。当我通过Ctrl+关闭应用程序时会发生这种情况C。当我按下停止调试按钮时,它不会发生。负责注册的代码如下:
private void ConfigureMassTransit(IServiceCollection services)
{
services.AddMassTransit(massTransit =>
{
massTransit.UsingInMemory((ctx, cfg)
=> cfg.ConfigureEndpoints(ctx, SnakeCaseEndpointNameFormatter.Instance));
massTransit.ConfigureRider(
_kafkaHost, _dbConnectionString);
});
services.AddMassTransitHostedService();
}
// The rider part.
rider.UsingKafka((ctx, kafka) =>
{
kafka.Host(kafkaHost);
kafka.TopicEndpoint<Null, IMessageName>(topicName, GroupIds.PushedDeals, cfg =>
{
cfg.CheckpointInterval = TimeSpan.FromMilliseconds(100);
cfg.AutoOffsetReset = AutoOffsetReset.Earliest;
cfg.ConfigureSaga<SagaState>(ctx);
});
});
例外:
Fatal error. System.AccessViolationException: Attempted to read or write protected memory. This is often an indication that other memory is corrupt.
at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr, IntPtr)
at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr, IntPtr)
at Confluent.Kafka.Consumer`2[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(Int32)
at Confluent.Kafka.Consumer`2[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(System.Threading.CancellationToken)
at MassTransit.KafkaIntegration.Contexts.KafkaConsumerContext`2+<>c__DisplayClass15_0[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].<Consume>b__0()
at MassTransit.Util.ChannelExecutor+SynchronousFuture`1[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Run()
at MassTransit.Util.ChannelExecutor+<RunFromChannel>d__12.MoveNext()
at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[MassTransit.Util.ChannelExecutor+<RunFromChannel>d__12, MassTransit, Version=7.1.4.0, Culture=neutral, PublicKeyToken=b8e0e9f2f1e657fa]].ExecutionContextCallback(System.Object)
at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[MassTransit.Util.ChannelExecutor+<RunFromChannel>d__12, MassTransit, Version=7.1.4.0, Culture=neutral, PublicKeyToken=b8e0e9f2f1e657fa]].MoveNext(System.Threading.Thread)
at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[MassTransit.Util.ChannelExecutor+<RunFromChannel>d__12, MassTransit, Version=7.1.4.0, Culture=neutral, PublicKeyToken=b8e0e9f2f1e657fa]].MoveNext()
at System.Threading.ThreadPoolGlobals+<>c.<.cctor>b__5_0(System.Object)
at System.Threading.Channels.AsyncOperation`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].SetCompletionAndInvokeContinuation()
at System.Threading.Channels.AsyncOperation`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].SignalCompletion()
at System.Threading.Channels.SingleConsumerUnboundedChannel`1+UnboundedChannelWriter[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].TryWrite(System.__Canon)
at System.Threading.Channels.SingleConsumerUnboundedChannel`1+UnboundedChannelWriter[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].WriteAsync(System.__Canon, System.Threading.CancellationToken)
at MassTransit.Util.ChannelExecutor+<Run>d__11`1[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].MoveNext()
at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[[MassTransit.Util.ChannelExecutor+<Run>d__11`1[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]], MassTransit, Version=7.1.4.0, Culture=neutral, PublicKeyToken=b8e0e9f2f1e657fa]](<Run>d__11`1<System.__Canon> ByRef)
at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Start[[MassTransit.Util.ChannelExecutor+<Run>d__11`1[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]], MassTransit, Version=7.1.4.0, Culture=neutral, PublicKeyToken=b8e0e9f2f1e657fa]](<Run>d__11`1<System.__Canon> ByRef)
at MassTransit.Util.ChannelExecutor.Run[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]](System.Func`1<System.__Canon>, System.Threading.CancellationToken)
at MassTransit.KafkaIntegration.Contexts.KafkaConsumerContext`2[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(System.Threading.CancellationToken)
at MassTransit.KafkaIntegration.Contexts.SharedConsumerContext`2[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(System.Threading.CancellationToken)
当我注释掉主题注册或使用 RabbitMq 作为总线时,它已经停止发生。我知道内存总线主要用于测试,但是现有的 Kafka 基础设施,所以运行另一个基础设施没有意义。