我正在使用 Spring Cloud Stream 的 OOTB 示例。
我的主要代码中有这段代码。
@SpringBootApplication
@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
public class AvroKafkaApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(AvroKafkaApplication.class, args);
}
}
当我在其中投入一些东西时,我的生产者类中的处理器类为空。
@Service
public class AvroProducer {
@Autowired
private Processor processor;
public void produceEmployeeDetails(int empId, String firstName, String lastName) {
// creating employee details
Employee employee = new Employee();
employee.setId(empId);
employee.setFirstName(firstName);
employee.setLastName(lastName);
employee.setDepartment("IT");
employee.setDesignation("Engineer");
// creating partition key for kafka topic
EmployeeKey employeeKey = new EmployeeKey();
employeeKey.setId(empId);
employeeKey.setDepartmentName("IT");
Message<Employee> message = MessageBuilder.withPayload(employee)
.setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey)
.build();
processor.output()
.send(message);
}
}
@Autowired 注释工作正常,因为我的功能没有错误,但流程变量随时为空。
可以从这里克隆基础项目: https ://github.com/eugenp/tutorials/tree/master/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka
对此有什么想法吗?
提前致谢!