0

我将 MassTransit 7.1.4 版与 .Net Core 3.1、ASP.NET Core Web 应用程序一起使用。当我通过Ctrl+关闭应用程序时会发生这种情况C。当我按下停止调试按钮时,它不会发生。负责注册的代码如下:

private void ConfigureMassTransit(IServiceCollection services)
{
    services.AddMassTransit(massTransit =>
    {
        massTransit.UsingInMemory((ctx, cfg)
            => cfg.ConfigureEndpoints(ctx, SnakeCaseEndpointNameFormatter.Instance));

        massTransit.ConfigureRider(
            _kafkaHost, _dbConnectionString);
            });

        services.AddMassTransitHostedService();
}

// The rider part.

rider.UsingKafka((ctx, kafka) =>
{
    kafka.Host(kafkaHost);

    kafka.TopicEndpoint<Null, IMessageName>(topicName, GroupIds.PushedDeals, cfg =>
    {
        cfg.CheckpointInterval = TimeSpan.FromMilliseconds(100);
        cfg.AutoOffsetReset = AutoOffsetReset.Earliest;
        cfg.ConfigureSaga<SagaState>(ctx);
    });                
});

例外:

Fatal error. System.AccessViolationException: Attempted to read or write protected memory. This is often an indication that other memory is corrupt.
   at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr, IntPtr)
   at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr, IntPtr)
   at Confluent.Kafka.Consumer`2[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(Int32)
   at Confluent.Kafka.Consumer`2[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(System.Threading.CancellationToken)
   at MassTransit.KafkaIntegration.Contexts.KafkaConsumerContext`2+<>c__DisplayClass15_0[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].<Consume>b__0()
   at MassTransit.Util.ChannelExecutor+SynchronousFuture`1[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Run()
   at MassTransit.Util.ChannelExecutor+<RunFromChannel>d__12.MoveNext()
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[MassTransit.Util.ChannelExecutor+<RunFromChannel>d__12, MassTransit, Version=7.1.4.0, Culture=neutral, PublicKeyToken=b8e0e9f2f1e657fa]].ExecutionContextCallback(System.Object)
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[MassTransit.Util.ChannelExecutor+<RunFromChannel>d__12, MassTransit, Version=7.1.4.0, Culture=neutral, PublicKeyToken=b8e0e9f2f1e657fa]].MoveNext(System.Threading.Thread)
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[MassTransit.Util.ChannelExecutor+<RunFromChannel>d__12, MassTransit, Version=7.1.4.0, Culture=neutral, PublicKeyToken=b8e0e9f2f1e657fa]].MoveNext()
   at System.Threading.ThreadPoolGlobals+<>c.<.cctor>b__5_0(System.Object)
   at System.Threading.Channels.AsyncOperation`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].SetCompletionAndInvokeContinuation()
   at System.Threading.Channels.AsyncOperation`1[[System.Boolean, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].SignalCompletion()
   at System.Threading.Channels.SingleConsumerUnboundedChannel`1+UnboundedChannelWriter[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].TryWrite(System.__Canon)
   at System.Threading.Channels.SingleConsumerUnboundedChannel`1+UnboundedChannelWriter[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].WriteAsync(System.__Canon, System.Threading.CancellationToken)
   at MassTransit.Util.ChannelExecutor+<Run>d__11`1[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].MoveNext()
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[[MassTransit.Util.ChannelExecutor+<Run>d__11`1[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]], MassTransit, Version=7.1.4.0, Culture=neutral, PublicKeyToken=b8e0e9f2f1e657fa]](<Run>d__11`1<System.__Canon> ByRef)
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Start[[MassTransit.Util.ChannelExecutor+<Run>d__11`1[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]], MassTransit, Version=7.1.4.0, Culture=neutral, PublicKeyToken=b8e0e9f2f1e657fa]](<Run>d__11`1<System.__Canon> ByRef)
   at MassTransit.Util.ChannelExecutor.Run[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]](System.Func`1<System.__Canon>, System.Threading.CancellationToken)
   at MassTransit.KafkaIntegration.Contexts.KafkaConsumerContext`2[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(System.Threading.CancellationToken)
   at MassTransit.KafkaIntegration.Contexts.SharedConsumerContext`2[[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.__Canon, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]].Consume(System.Threading.CancellationToken)

当我注释掉主题注册或使用 RabbitMq 作为总线时,它已经停止发生。我知道内存总线主要用于测试,但是现有的 Kafka 基础设施,所以运行另一个基础设施没有意义。

4

0 回答 0