1

我正在使用 Spring Cloud Stream 的 OOTB 示例。

我的主要代码中有这段代码。

@SpringBootApplication
@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
public class AvroKafkaApplication {

    public static void main(String[] args) throws InterruptedException {

        SpringApplication.run(AvroKafkaApplication.class, args);
     }
}

当我在其中投入一些东西时,我的生产者类中的处理器类为空。

@Service
public class AvroProducer {

    @Autowired
    private Processor processor;

    public void produceEmployeeDetails(int empId, String firstName, String lastName) {

        // creating employee details
        Employee employee = new Employee();
        employee.setId(empId);
        employee.setFirstName(firstName);
        employee.setLastName(lastName);
        employee.setDepartment("IT");
        employee.setDesignation("Engineer");

        // creating partition key for kafka topic
        EmployeeKey employeeKey = new EmployeeKey();
        employeeKey.setId(empId);
        employeeKey.setDepartmentName("IT");

        Message<Employee> message = MessageBuilder.withPayload(employee)
            .setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey)
            .build();

        processor.output()
            .send(message);
    }

}

@Autowired 注释工作正常,因为我的功能没有错误,但流程变量随时为空。

可以从这里克隆基础项目: https ://github.com/eugenp/tutorials/tree/master/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka

对此有什么想法吗?

提前致谢!

4

1 回答 1

3

我刚刚下载了你的项目,它对我来说很好用......

$ http post localhost:8080/employees/1/John/Doe
HTTP/1.1 200 
Content-Length: 33
Content-Type: text/plain;charset=UTF-8
Date: Wed, 11 Dec 2019 21:52:38 GMT

Sent employee details to consumer

2019-12-11 16:52:38.131 INFO 24203 --- [container-0-C-1] com.baeldung.consumer.AvroConsumer :让我们处理员工详细信息:{“id”:1,“firstName”:“John ", "lastName": "Doe", "department": "IT", "designation": "Engineer"}

我所做的唯一更改是修复concurrencyyaml 中的属性(缺少consumer元素),但这并没有什么不同。

于 2019-12-11T21:54:26.790 回答