我一直在尝试制作一个网站演示,该网站使用 MassTransit 和 RabbitMQ 将消息发布到在 Service Fabric 上作为有状态服务运行的服务。
一切都很顺利,我的客户会发布一条消息:
IBusControl bus = BusConfigurator.ConfigureBus();
Uri sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}" + $"{RabbitMqConstants.PeopleServiceQueue}");
ISendEndpoint endPoint = await bus.GetSendEndpoint(sendToUri);
await endPoint.Send<ICompanyRequest>(new {CompanyId = id });
我的服务结构服务中的消费者定义如下:
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
{
h.Username(RabbitMqConstants.UserName);
h.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
{
e.Consumer<PersonInformationConsumer>();
});
});
busControl.Start();
这确实使我可以在课堂上使用该消息,并且可以很好地处理它。当我们想要使用 IReliableDictonary 或 IReliableQueue 或任何需要引用从 Service Fabric 服务中的 RunAsync 函数运行的上下文时,就会出现问题。
所以我的问题是,我如何配置(是否可能)MassTransit 以在有状态的 Service Fabric 服务中工作,该服务了解服务上下文本身?
提前谢谢了。麦克风
更新 好的,如果我将注册例程指向我的消息消费者类(例如),我已经在这方面取得了一些进展:
ServiceRuntime.RegisterServiceAsync("ServiceType", context => new PersonInformationConsumer(context)).GetAwaiter().GetResult();
ServiceEventSource.Current.ServiceTypeRegistered(Process.GetCurrentProcess().Id, typeof(PersonInformationConsumer).Name);
然后在我的消息消费者类中,我可以执行以下操作:
internal sealed class PersonInformationConsumer : StatefulService, IConsumer<ICompanyRequest>
{
private static StatefulServiceContext _currentContext;
#region Constructors
public PersonInformationConsumer(StatefulServiceContext serviceContext) : base(serviceContext)
{
_currentContext = serviceContext;
}
public PersonInformationConsumer() : base(_currentContext)
{
}
我现在可以成功调用服务消息:
ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);
我现在遇到的问题是试图在 IReliableDictionary 上存储一些东西,这样做会导致“对象引用未设置为对象的实例”错误:( ...任何想法都将不胜感激(尽管现在可能要到新的一年才能阅读!)
public async Task Consume(ConsumeContext<ICompanyRequest> context)
{
ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);
using (ITransaction tx = StateManager.CreateTransaction())
{
try
{
var myDictionary = await StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary");
这导致错误....帮助!:)