1

我构建了一个风暴三叉戟拓扑,如下所示:

tridentTopology.newStream(spoutId, spout).parallelismHint(spoutParallel)
                       .each(new Fields("tId", "message"), new VerifyFilter())
                       .each(new Fields("tId", "message"), new ParseFunction(), new Fields("rowKey", "column", "value"))
                       .groupBy(new Fields("rowKey", "column"))
                       .aggregate(new Fields("value"),new Count(),new Fields("count"))
                       .each(new Fields("rowKey", "column", "count"), new SaveFunction(), new Fields(columns))
                       .shuffle()
                       .partitionPersist(factory, new Fields(rowKeyANdColumns), new HBaseUpdater())
                       .parallelismHint(boltParallel);

util execute each(new Fields("rowKey", "column", "count"), new SaveFunction(), new Fields(columns),我在SaveFunction中得到了打印日志,但是partitionPersist没有将日期写入hbase,并且所有拓扑都没有错误;

谁能给点建议?

4

0 回答 0