我有一个结构如下的表。
trans_count
start_time, end_time, count
00:00:01 00:00:10 1000
00:00:11 00:00:20 800
Spark 侦听来自 Kafka 的事件并进行 10 秒的分组,并且必须插入到 Phoenix hbase 表中。
10 秒后,我必须首先检查 start_time、end_time 组合是否在表中。如果它在那里,我们必须采用现有计数并添加新计数并再次插入。
UPSERT INTO trans_count(start_time, end_time, count) SELECT start_time, end_time, count? FROM trans_count WHERE start_time = ? AND end_time = ?
如果上述语句中没有更新插入行,则只需更新插入数据。
在 Apache Storm 中,我能够在 configure 方法中创建一个 Phoenix Connection 对象,并且能够每 10 秒使用一次相同的连接到 UPSERT。
在 Spark 中,我无法创建连接对象并为 RDD 中的每个对象使用相同的对象。我从 spark 的输出将是一个 JavaDStream>,其中 start_time、end_time、count 都是 Map 中的键。
我最终为 RDD 的每次迭代创建了一个连接对象,我觉得这不是正确的方法。我读过 Phoenix 连接是轻量级的,但是为每个 RDD 创建一个连接对象似乎不是正确的方法。
我阅读了一些有关相关内容的博客,但无法做到这一点。请帮忙。
注意:该应用程序是在 JAVA 中构建的。