0

我有一个服务,它正在生产和使用来自不同 Spring Cloud Stream Channels 的消息(绑定到 EventHub/Kafka 主题)。有几个这样的服务设置类似。

配置如下所示

 public interface MessageStreams {
      String WORKSPACE = "workspace";
      String UPLOADNOTIFICATION = "uploadnotification";
      String BLOBNOTIFICATION = "blobnotification";
      String INGESTIONSTATUS = "ingestionstatusproducer";

      @Input(WORKSPACE)
      SubscribableChannel workspaceChannel();

      @Output(UPLOADNOTIFICATION)
      MessageChannel uploadNotificationChannel();

      @Input(BLOBNOTIFICATION)
      SubscribableChannel blobNotificationChannel();

      @Output(INGESTIONSTATUS)
      MessageChannel ingestionStatusChannel();
    }


    @EnableBinding(MessageStreams.class)
    public class EventHubStreamsConfiguration {
    }

Producer/Publisher 代码如下所示

    @Service
    @Slf4j
    public class IngestionStatusEventPublisher {
      private final MessageStreams messageStreams;

      public IngestionStatusEventPublisher(MessageStreams messageStreams) {
        this.messageStreams = messageStreams;
      }

      public void sendIngestionStatusEvent() {
        log.info("Sending ingestion status event");
        System.out.println("Sending ingestion status event");
        MessageChannel messageChannel = messageStreams.ingestionStatusChannel();
        boolean messageSent = messageChannel.send(MessageBuilder
            .withPayload(IngestionStatusMessage.builder()
                .correlationId("some-correlation-id")
                .status("done")
                .source("some-source")
                .eventTime(OffsetDateTime.now())
                .build())
            .setHeader("tenant-id", "some-tenant")
            .build());
        log.info("Ingestion status event sent successfully {}", messageSent);
      }
    }

同样,我有多个其他发布者发布到不同的事件中心/主题。请注意,为每个发布的消息设置了一个租户 ID 标头。这是我的多租户应用程序特有的,用于跟踪租户上下文。另请注意,我在发送消息时正在获取要发布的频道。

我的消费者代码如下所示

    @Component
    @Slf4j
    public class IngestionStatusEventHandler {
      private AtomicInteger eventCount = new AtomicInteger();

      @StreamListener(TestMessageStreams.INGESTIONSTATUS)
      public void handleEvent(@Payload IngestionStatusMessage message, @Header(name = "tenant-id") String tenantId) throws Exception {
        log.info("New ingestion status event received: {} in Consumer: {}", message, Thread.currentThread().getName());

        // set the tenant context as thread local from the header.

      }

同样,我有几个这样的消费者,并且基于发布者发送的传入租户 ID 标头在每个消费者中设置了一个租户上下文。

我的问题是

如何摆脱在 Publisher 中设置tenant-id 标头并在 Consumer 中设置租户上下文的样板代码,方法是将其抽象到一个库中,该库可以包含在我拥有的所有不同服务中。

此外,是否有一种方法可以根据正在发布的消息的类型动态识别通道。对于给定场景中的 ex IngestionStatusMessage.class

4

1 回答 1

1

tenant-id在通用代码中设置和标头并避免在每个微服务中复制/粘贴它,您可以使用 aChannelInterceptor并将其设置为带有 a@GlobalChannelInterceptor及其patterns选项的全局代码。

在 Spring 集成中查看更多信息:https ://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/core.html#channel-interceptors

https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/overview.html#configuration-enable-integration

您无法通过有效负载类型进行通道选择,因为有效负载类型实际上是由@StreamListener方法签名确定的。

您可以尝试有@Router一个Message<?>期望的将军,然后根据该请求消息上下文返回一个特定的通道名称来路由。

请参阅https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/message-routing.html#messaging-routing-chapter

于 2020-04-20T13:31:37.073 回答