我正在使用弹簧批处理远程分区。下面是我的配置
<task:executor id="taskExecutor" pool-size="50" />
<rabbit:template id="computeAmqpTemplate"
connection-factory="rabbitConnectionFactory" routing-key="computeQueue"
reply-timeout="${compute.partition.timeout}">
</rabbit:template>
<int:channel id="computeOutboundChannel">
<int:dispatcher task-executor="taskExecutor" />
</int:channel>
<int:channel id="computeInboundStagingChannel" />
<amqp:outbound-gateway request-channel="computeOutboundChannel"
reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<beans:bean id="computeMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="computeOutboundChannel"
p:receiveTimeout="${compute.partition.timeout}" />
<beans:bean id="computePartitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
p:stepName="computeStep" p:gridSize="${compute.grid.size}"
p:messagingOperations-ref="computeMessagingTemplate" />
<int:aggregator ref="computePartitionHandler"
send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}"
input-channel="computeInboundStagingChannel" />
<amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}"
request-channel="computeInboundChannel"
reply-channel="computeOutboundStagingChannel" queue-names="computeQueue"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<int:channel id="computeInboundChannel" />
<int:service-activator ref="stepExecutionRequestHandler"
input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" />
<int:channel id="computeOutboundStagingChannel" />
<beans:bean id="computePartitioner"
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
scope="step" />
<beans:bean id="computeFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader"
p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
scope="step" />
<beans:bean id="computeItemWriter"
class="com.st.batch.foundation.writers.ComputeItemWriter"
p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}"
p:batchId="#{jobParameters[batch_id]}" scope="step" />
<step id="computeStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="computeFileItemReader" writer="computeItemWriter"
commit-interval="${compute.commit.interval}" />
</tasklet>
</step>
<flow id="computeFlow">
<step id="computeStep.master">
<partition partitioner="computePartitioner"
handler="computePartitionHandler" />
</step>
</flow>
<job id="computeJob" restartable="true">
<flow id="computeJob.computeFlow" parent="computeFlow" />
</job>
问题是,在多资源分区器中,我传递模式以查找文件并创建等于文件数的分区。但是该目录是在运行时有条件地创建的。
如果目录不存在(输入文件不可用),我希望此步骤成功并继续下一步。
现在,工作只是挂起,不做任何事情。它既不认为该步骤成功也不抛出异常,因此甚至不会失败。它只是在这一步变得空闲。
<beans:bean id="computePartitioner"
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
scope="step" />
有没有办法处理这个?我只想将此步骤视为成功并进入下一步。
更新:
刚刚通过创建在本地服务器而不是远程服务器上运行的分区步骤对其进行了测试,如果文件不存在,则默认将步骤标记为已完成。所以问题不在于MultiResourcePartitioner,当我们使用分区步骤在具有上述配置的远程服务器上运行时会出现问题。
我猜,即使没有发送步骤执行消息,它的聚合逻辑也会一直等待响应?是否由于依赖于 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 标头属性的默认 SequenceSizeReleaseStrategy 并且因为根本没有消息,所以聚合器无法访问 SEQUENCE_SIZE ?
@MessageEndpoint
public class MessageChannelPartitionHandler implements PartitionHandler {
public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter,
StepExecution masterStepExecution) throws Exception {
Set<StepExecution> split = stepExecutionSplitter.split(masterStepExecution, gridSize);
int count = 0;
if (replyChannel == null) {
replyChannel = new QueueChannel();
}//end if
for (StepExecution stepExecution : split) {
Message<StepExecutionRequest> request = createMessage(count++, split.size(), new StepExecutionRequest(
stepName, stepExecution.getJobExecutionId(), stepExecution.getId()), replyChannel);
if (logger.isDebugEnabled()) {
logger.debug("Sending request: " + request);
}
messagingGateway.send(request);
}
Message<Collection<StepExecution>> message = messagingGateway.receive(replyChannel);
if (logger.isDebugEnabled()) {
logger.debug("Received replies: " + message);
}
Collection<StepExecution> result = message.getPayload();
return result;
}
private Message<StepExecutionRequest> createMessage(int sequenceNumber, int sequenceSize,
StepExecutionRequest stepExecutionRequest, PollableChannel replyChannel) {
return MessageBuilder.withPayload(stepExecutionRequest).setSequenceNumber(sequenceNumber)
.setSequenceSize(sequenceSize)
.setCorrelationId(stepExecutionRequest.getJobExecutionId() + ":" + stepExecutionRequest.getStepName())
.setReplyChannel(replyChannel)
.build();
}
}
如果没有步骤执行请求,即拆分计数为 0,则它不会进入 forloop,因此不会发送任何消息,但在 for 循环之后它仍然会等待接收响应。有什么解决办法?