3

我尝试使用 Spring Integration 实现以下工作流程:

  • 1) 轮询 REST API
  • 2) 将 POJO 存储在 Cassandra 集群中

这是我第一次尝试使用 Spring Integration,所以我仍然对来自参考的大量信息感到不知所措。经过一番研究,我可以进行以下工作。

  • 1) 轮询 REST API
  • 2) 将映射的 POJO JSON 结果转换为字符串
  • 3)将字符串保存到文件中

这是代码:

@Configuration
public class ConsulIntegrationConfig {

    @InboundChannelAdapter(value = "consulHttp", poller = @Poller(maxMessagesPerPoll = "1", fixedDelay = "1000"))
    public String consulAgentPoller() {
        return "";
    }

    @Bean
    public MessageChannel consulHttp() {
        return MessageChannels.direct("consulHttp").get();
    }

    @Bean
    @ServiceActivator(inputChannel = "consulHttp")
    MessageHandler consulAgentHandler() {
        final HttpRequestExecutingMessageHandler handler =
            new HttpRequestExecutingMessageHandler("http://localhost:8500/v1/agent/self");
        handler.setExpectedResponseType(AgentSelfResult.class);
        handler.setOutputChannelName("consulAgentSelfChannel");
        LOG.info("Created bean'consulAgentHandler'");
        return handler;
    }

    @Bean
    public MessageChannel consulAgentSelfChannel() {
        return MessageChannels.direct("consulAgentSelfChannel").get();
    }

    @Bean
    public MessageChannel consulAgentSelfFileChannel() {
        return MessageChannels.direct("consulAgentSelfFileChannel").get();
    }

    @Bean
    @ServiceActivator(inputChannel = "consulAgentSelfFileChannel")
    MessageHandler consulAgentFileHandler() {
        final Expression directoryExpression = new SpelExpressionParser().parseExpression("'./'");
        final FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
        handler.setFileNameGenerator(message -> "../../agent_self.txt");
        handler.setFileExistsMode(FileExistsMode.APPEND);
        handler.setCharset("UTF-8");
        handler.setExpectReply(false);
        return handler;
     }
}

@Component
public final class ConsulAgentTransformer {

    @Transformer(inputChannel = "consulAgentSelfChannel", outputChannel = "consulAgentSelfFileChannel")
    public String transform(final AgentSelfResult json) throws IOException {
        final String result = new StringBuilder(json.toString()).append("\n").toString();
        return result;
}

这很好用!

但是现在,我不想将对象写入文件,而是想将其存储在带有 spring-data-cassandra 的 Cassandra 集群中。为此,我在配置文件中注释掉了文件处理程序,在转换器中返回 POJO 并创建以下内容:

@MessagingGateway(name = "consulCassandraGateway", defaultRequestChannel = "consulAgentSelfFileChannel")
public interface CassandraStorageService {

    @Gateway(requestChannel="consulAgentSelfFileChannel")
    void store(AgentSelfResult agentSelfResult);
}

@Component
public final class CassandraStorageServiceImpl implements CassandraStorageService {

    @Override
    public void store(AgentSelfResult agentSelfResult) {
        //use spring-data-cassandra repository to store
        LOG.info("Received 'AgentSelfResult': {} in Cassandra cluster...");
        LOG.info("Trying to store 'AgentSelfResult' in Cassandra cluster...");
}
}

但这似乎是一种错误的做法,服务方法永远不会被触发。

所以我的问题是,对于我的用例来说,正确的方法是什么?我是否必须在我的服务组件中实现 MessageHandler 接口,并在我的配置中使用 @ServiceActivator。还是我当前的“网关方法”中缺少某些东西?或者也许还有另一种解决方案,我看不到..

如前所述,我是 SI 新手,所以这可能是一个愚蠢的问题......

不过,提前非常感谢!

4

2 回答 2

0

所以我终于让它工作了。我需要做的就是

@Component
public final class CassandraStorageServiceImpl implements CassandraStorageService {

    @ServiceActivator(inputChannel="consulAgentSelfFileChannel")
    @Override
    public void store(AgentSelfResult agentSelfResult) {
        //use spring-data-cassandra repository to store
        LOG.info("Received 'AgentSelfResult': {}...");
        LOG.info("Trying to store 'AgentSelfResult' in Cassandra cluster...");
    }
}

CassandraMessageHandler 和 spring-cloud-streaming 对我的用例来说似乎是一个很大的开销,而且我还没有真正理解......通过这个解决方案,我可以控制我的 spring 组件中发生的事情。

于 2015-12-20T13:28:55.180 回答
0

目前尚不清楚您是如何在CassandraStorageServicebean 中接线的。

Spring Integration Cassandra 扩展项目有一个消息处理程序实现。

spring-cloud-stream-modules 中的Cassandra Sink将其与 Java 配置一起使用,因此您可以将其用作示例。

于 2015-12-19T19:34:22.727 回答