尝试使用 Spring Cloud Stream 3.0 以批处理模式使用 kafka 消息。
消费者收到一个包含单个记录的列表,而不是更多。
下面是 yml ,使用的消费者编码
spring:
cloud:
stream:
bindings:
process-in-0:
destination: person-command
consumer:
# maxAttempts: 1
batch-mode: true
properties:
maxPollRecords: 10
minFetchBytes: 5000
fetchMaxWaitMs: 1000
消费者代码
@Transactional
@Bean
public Function<List<PersonEvent>, List<PersonEvent>> process() {
return pel ->{
List<Person> lstPerson = new ArrayList<Person>();
List<PersonEvent> lstPersonEvent = new ArrayList<PersonEvent>();
for (PersonEvent personEvent : pel) {
Person person = new Person();
person.setName(personEvent.getName());
lstPerson.add(person);
personEvent.setType("PersonSaved");
lstPersonEvent.add(personEvent);
}
logger.info("Person Size {}"+lstPerson.size());
Iterable<Person> savedPerson = repository.saveAll(lstPerson);
logger.info("Saved Person Size {}"+lstPerson.size());
return lstPersonEvent;
};
}
输出:日志显示在列表中获取了一条记录,而不是我们需要一批 10 条记录
2020-01-05 15:11:49.044 INFO 29590 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$6d65e615 : Person Size {}1
2020-01-05 15:11:49.054 INFO 29590 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$6d65e615 : Saved Person Size {}1
2020-01-05 15:11:50.045 INFO 29590 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$6d65e615 : Person Size {}1
2020-01-05 15:11:50.053 INFO 29590 --- [container-0-C-1] ication$$EnhancerBySpringCGLIB$$6d65e615 : Saved Person Size {}1