我正在使用弹簧批处理远程分区进行批处理。我正在使用 Spring Batch admin 启动作业。
我的入站网关消费者并发步长为 10,但并行运行的最大分区数为 8。
我想稍后将消费者并发增加到 15。
下面是我的配置,
<task:executor id="taskExecutor" pool-size="50" />
<rabbit:template id="computeAmqpTemplate"
connection-factory="rabbitConnectionFactory" routing-key="computeQueue"
reply-timeout="${compute.partition.timeout}">
</rabbit:template>
<int:channel id="computeOutboundChannel">
<int:dispatcher task-executor="taskExecutor" />
</int:channel>
<int:channel id="computeInboundStagingChannel" />
<amqp:outbound-gateway request-channel="computeOutboundChannel"
reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<beans:bean id="computeMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="computeOutboundChannel"
p:receiveTimeout="${compute.partition.timeout}" />
<beans:bean id="computePartitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
p:stepName="computeStep" p:gridSize="${compute.grid.size}"
p:messagingOperations-ref="computeMessagingTemplate" />
<int:aggregator ref="computePartitionHandler"
send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}"
input-channel="computeInboundStagingChannel" />
<amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}"
request-channel="computeInboundChannel"
reply-channel="computeOutboundStagingChannel" queue-names="computeQueue"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<int:channel id="computeInboundChannel" />
<int:service-activator ref="stepExecutionRequestHandler"
input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" />
<int:channel id="computeOutboundStagingChannel" />
<beans:bean id="computePartitioner"
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
scope="step" />
<beans:bean id="computeFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader"
p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
scope="step" />
<beans:bean id="computeItemWriter"
class="com.st.batch.foundation.writers.ComputeItemWriter"
p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}"
p:batchId="#{jobParameters[batch_id]}" scope="step" />
<step id="computeStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="computeFileItemReader" writer="computeItemWriter"
commit-interval="${compute.commit.interval}" />
</tasklet>
</step>
<flow id="computeFlow">
<step id="computeStep.master">
<partition partitioner="computePartitioner"
handler="computePartitionHandler" />
</step>
</flow>
<job id="computeJob" restartable="true">
<flow id="computeJob.computeFlow" parent="computeFlow" />
</job>
compute.grid.size = 112
compute.consumer.concurrency = 10
Input files are splited to 112 equal parts = compute.grid.size = total number of partitions
Number of servers = 4.
有2个问题,
i) 即使我将并发设置为 10,运行的最大线程数是 8。
ii)
有些较慢,因为其他进程在它们上面运行,有些则更快,所以我想确保公平分配步骤执行,即如果更快的服务器完成了它们的执行,队列中的其他剩余执行应该交给它们。它不应该以循环方式分发。
我知道在 rabbitmq 中有预取计数设置和 ack 模式可以很好地分发。对于spring集成,prefetch count默认为1,ack模式默认为AUTO。但是即使其他服务器长时间运行,仍然有一些服务器继续运行更多分区。理想情况下,任何服务器都不应闲置。
更新:
我现在观察到的另一件事是,对于使用 split 并行运行的某些步骤(不使用远程分区进行分布式)也最多并行运行 8 个。它看起来像线程池限制问题,但您可以看到 taskExecutor 的 pool-size 设置为 50。
spring-batch/spring-batch-admin 中是否有任何限制并发运行步骤数的内容?
第二次更新:
而且,如果有 8 个或更多线程在并行处理项中运行,则不会加载 spring batch admin。它只是挂起。如果我降低并发性,弹簧批处理管理负载。我什至通过在一台服务器上设置并发 4 和在另一台服务器上设置 8 来测试它,spring batch admin 没有加载它我使用运行 8 个线程的服务器的 URL,但它在运行 4 个线程的服务器上工作。
Spring 批处理管理器具有以下 jobLauncher 配置,
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor" ref="jobLauncherTaskExecutor" />
</bean>
<task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />
那里的池大小为6,与上述问题有什么关系?
或者在 tomcat 7 中是否有任何限制运行线程数为 8 的东西?