1

我正在尝试在 Apache NiFi 中创建一个自定义处理器,它可以将属性/字符串添加到流文件内容中的 JSON 对象。目前,当我只使用字符串时它可以工作,但是当我使用 NiFi 的表达式语言时它不起作用,尽管我的代码支持它。

表达式语言是 100% 正确的,因为它在另一个处理器中工作,我还尝试了不同的属性以确保它不是属性。

物业:

public static final PropertyDescriptor ADD_ATTRIBUTE = new PropertyDescriptor
        .Builder().name("Add Attribute")
        .description("Example Property")
        .required(true)
        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
        .expressionLanguageSupported(true)
        .build();

稍后在我的代码中,当我想获取值并放入我使用的 JSON 对象时:

jsonObject.put("hostname", context.getProperty(ADD_ATTRIBUTE).evaluateAttributeExpressions().getValue());

我还进行了单元测试,当我为 testrunner.setProperty 分配一个文本值时它可以工作。但是我不知道如何将属性分配给 testrunner 或如何在测试中使用表达式语言。

提前感谢您的任何建议或解决方案!

4

2 回答 2

5

我也会把Hortonworks Community Connection的答案放在这里 FWIW:

如果表达式引用流文件上的属性,则需要将对流文件的引用传递给 evaluateAttributeExpressions:

FlowFile flowFile = session.get();
jsonObject.put("hostname", context.getProperty(ADD_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue());

如果属性包含属性名称(而不是包含属性名称的表达式)并且您需要流文件中的值:

jsonObject.put("hostname", flowFile.getAttribute(context.getProperty(ADD_ATTRIBUTE).getValue()));

如果属性值本身包含表达式语言并且您想要评估它,请查看以下类:

org.apache.nifi.attribute.expression.language.Query
于 2016-03-11T18:49:55.587 回答
2

关于测试...

假设您正在针对传入的 FlowFile (evaluateAttributeExpressions(flowFile)) 评估表达式语言,那么您可以执行以下操作:

runner.setProperty(ADD_ATTRIBUTE, "${my.attribute}");

然后创建一个包含 my.attribute 的属性 Map:

final Map<String,String> attributes = new HashMap<>(); attributes.put("my.attribute", myAttribute);

然后将一些具有属性的内容排入队列:

runner.enqueue(fileIn, attributes); runner.run();

代码库中的一个示例:

https://github.com/apache/nifi/blob/1e56de9521e4bc0752b419ffc7d62e096db1c389/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/ TestPutSolrContentStream.java#L243

于 2016-03-12T15:38:22.600 回答