我正在使用 Kafka-Connect 来实现 Kafka-Elasticsearch 连接器。
生产者向 Kafka 主题发送了一个复杂的 JSON,我的连接器代码将使用它来持久化到 Elastic 搜索。连接器以 Struct 的形式获取数据(https://kafka.apache.org/0100/javadoc/org/apache/kafka/connect/data/Struct.html)。
我能够在顶级 Json 中获取 struct 的字段值,但无法从嵌套的 json 中获取。
{
"after": {
"test.test.employee.Value": {
"id": 5671111,
"name": {
"string": "abc"
}
}
},
"op": "u",
"ts_ms": {
"long": 1474892835943
}
}
我能够解析“op”,但不能解析“test.test.employee.Value”。
Struct afterStruct = struct.getStruct("after"); // giving me proper value.
String opValue = struct.getString("op"); // giving me proper value of "u".
Struct valueStruct = afterStruct .getStruct("test.test.employee.Value"); // org.apache.kafka.connect.errors.DataException: test.test.employee.Value is not a valid field name