现在我ResultScanner像这样实现行计数
for (Result rs = scanner.next(); rs != null; rs = scanner.next()) {
number++;
}
如果数据达到数百万次计算量很大。我想实时计算我不想使用 Mapreduce
如何快速计算行数。
在 HBase 中使用 RowCounter RowCounter 是一个 mapreduce 作业,用于计算表的所有行。这是一个很好的实用程序,可用作完整性检查,以确保 HBase 在存在元数据不一致问题时可以读取表的所有块。它将在单个进程中运行所有 mapreduce,但如果您有一个 MapReduce 集群可供它利用,它将运行得更快。
$ hbase org.apache.hadoop.hbase.mapreduce.RowCounter <tablename>
Usage: RowCounter [options]
<tablename> [
--starttime=[start]
--endtime=[end]
[--range=[startKey],[endKey]]
[<column1> <column2>...]
]
您可以使用 hbase 中的 count 方法来统计行数。但是,是的,计算大表的行数可能很慢。count 'tablename' [interval]
返回值是行数。
此操作可能需要很长时间(运行 '$HADOOP_HOME/bin/hadoop jar hbase.jar rowcount' 以运行计数 mapreduce 作业)。当前计数默认每 1000 行显示一次。可以选择指定计数间隔。默认情况下,对计数扫描启用扫描缓存。默认缓存大小为 10 行。如果您的行较小,您可能需要增加此参数。
例子:
hbase> count 't1'
hbase> count 't1', INTERVAL => 100000
hbase> count 't1', CACHE => 1000
hbase> count 't1', INTERVAL => 10, CACHE => 1000
相同的命令也可以在表引用上运行。假设您引用了表 't1',相应的命令将是:
hbase> t.count
hbase> t.count INTERVAL => 100000
hbase> t.count CACHE => 1000
hbase> t.count INTERVAL => 10, CACHE => 1000
如果您因任何原因无法使用RowCounter,那么这两个过滤器的组合应该是获得计数的最佳方式:
FirstKeyOnlyFilter() AND KeyOnlyFilter()
这FirstKeyOnlyFilter将导致扫描器仅返回它找到的第一个列限定符,而不是扫描器返回表中的所有列限定符,这将最小化网络带宽。简单地选择一个列限定符返回怎么样?如果您可以保证每一行都存在列限定符,这将起作用,但如果这不是真的,那么您将得到不准确的计数。
这KeyOnlyFilter将导致扫描器仅返回列族,并且不会为列限定符返回任何值。这进一步降低了网络带宽,在一般情况下不会造成太大的减少,但可能存在一种边缘情况,即前一个过滤器选择的第一列恰好是一个非常大的值。
我试着玩弄,scan.setCaching但结果到处都是。也许它会有所帮助。
我在开始和停止之间有 1600 万行,我做了以下伪经验测试:
激活 FirstKeyOnlyFilter 和 KeyOnlyFilter :
未设置缓存(即默认值)时,需要 188 秒。
缓存设置为 1,耗时 188 秒
缓存设置为 10,需要 200 秒
缓存设置为 100,耗时 187 秒
缓存设置为 1000,耗时 183 秒。
缓存设置为 10000,耗时 199 秒。
缓存设置为 100000,耗时 199 秒。
禁用 FirstKeyOnlyFilter 和 KeyOnlyFilter:
未设置缓存(即默认值),耗时 309 秒
我没有费心对此进行适当的测试,但似乎很明显FirstKeyOnlyFilterandKeyOnlyFilter是好的。
此外,这个特定表格中的单元格非常小 - 所以我认为过滤器在不同的表格上会更好。
这是一个 Java 代码示例:
导入 java.io.IOException;
导入 org.apache.hadoop.conf.Configuration;
导入 org.apache.hadoop.hbase.HBaseConfiguration;
导入 org.apache.hadoop.hbase.client.HTable;
导入 org.apache.hadoop.hbase.client.Result;
导入 org.apache.hadoop.hbase.client.ResultScanner;
导入 org.apache.hadoop.hbase.client.Scan;
导入 org.apache.hadoop.hbase.util.Bytes;
导入 org.apache.hadoop.hbase.filter.RowFilter;
导入 org.apache.hadoop.hbase.filter.KeyOnlyFilter;
导入 org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
导入 org.apache.hadoop.hbase.filter.FilterList;
导入 org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
导入 org.apache.hadoop.hbase.filter.RegexStringComparator;
公共类 HBaseCount {
公共静态 void main(String[] args) 抛出 IOException {
配置配置 = HBaseConfiguration.create();
HTable 表 = 新 HTable(config, "my_table");
扫描扫描=新扫描(
Bytes.toBytes("foo"), Bytes.toBytes("foo~")
);
如果(args.length == 1){
scan.setCaching(Integer.valueOf(args[0]));
}
System.out.println("scan的缓存是" + scan.getCaching());
FilterList allFilters = new FilterList();
allFilters.addFilter(new FirstKeyOnlyFilter());
allFilters.addFilter(new KeyOnlyFilter());
scan.setFilter(allFilters);
ResultScanner 扫描仪 = table.getScanner(scan);
整数计数 = 0;
长开始 = System.currentTimeMillis();
尝试 {
for (结果 rr =scanner.next(); rr != null; rr =scanner.next()) {
计数 += 1;
if (count % 100000 == 0) System.out.println(count);
}
} 最后 {
扫描仪.close();
}
长端 = System.currentTimeMillis();
long elapsedTime = 结束 - 开始;
System.out.println("经过的时间是 " + (elapsedTime/1000F));
}
}
这是一个pychbase代码示例:
从 pychbase 导入连接
c = 连接()
t = c.table('my_table')
# 在后台这应用了 FirstKeyOnlyFilter 和 KeyOnlyFilter
# 类似于下面的happybase示例
打印 t.count(row_prefix="foo")
这是一个 Happybase 代码示例:
从happybase导入连接
c = 连接(...)
t = c.table('my_table')
计数 = 0
对于 _ in t.scan(filter='FirstKeyOnlyFilter() AND KeyOnlyFilter()'):
计数 += 1
打印计数
感谢@Tuckr 和@KennyCason的提示。
在 HBASE 中计算行数的简单、有效和高效的方法:
每当您插入一行时,都会触发此 API,它将增加该特定单元格。
Htable.incrementColumnValue(Bytes.toBytes("count"), Bytes.toBytes("details"), Bytes.toBytes("count"), 1);
检查该表中存在的行数。只需对该特定行“计数”使用“获取”或“扫描”API。
通过使用此方法,您可以在不到一毫秒的时间内获得行数。
要计算正确 YARN 集群上的 Hbase 表记录数,您还必须设置 map reduce 作业队列名称:
hbase org.apache.hadoop.hbase.mapreduce.RowCounter -Dmapreduce.job.queuename= < Your Q Name which you have SUBMIT access>
< TABLE_NAME>
您可以使用自 HBase 0.92 以来可用的协处理器。请参阅协处理器和AggregateProtocol和示例
两种方法为我工作以速度从 hbase 表中获取行数
情景#1
如果 hbase 表大小很小,则使用有效用户登录到 hbase shell 并执行
>count '<tablename>'
例子
>count 'employee'
6 row(s) in 0.1110 seconds
情景#2
如果 hbase 表大小很大,则执行内置的 RowCounter map reduce 作业:使用有效用户登录到 hadoop 机器并执行:
/$HBASE_HOME/bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter '<tablename>'
例子:
/$HBASE_HOME/bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'employee'
....
....
....
Virtual memory (bytes) snapshot=22594633728
Total committed heap usage (bytes)=5093457920
org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper$Counters
ROWS=6
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
如果您使用的是扫描仪,请在您的扫描仪中尝试让它返回尽可能少的限定符。实际上,您返回的限定符应该是可用的最小的(以字节为单位)。这将大大加快您的扫描速度。
不幸的是,这只会扩大到目前(数百万?)。更进一步,您可以实时执行此操作,但您首先需要运行 mapreduce 作业来计算所有行数。
将 Mapreduce 输出存储在 HBase 的一个单元中。每次添加一行,计数器加 1。每次删除一行,计数器减一。
当您需要实时访问行数时,您可以在 HBase 中读取该字段。
没有快速的方法来以可扩展的方式计算行数。你只能数这么快。
转到 Hbase 主目录并运行此命令,
./bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'namespace:tablename'
这将启动一个 mapreduce 作业,输出将显示 hbase 表中存在的记录数。
你可以在这里找到示例:
/**
* Used to get the number of rows of the table
* @param tableName
* @param familyNames
* @return the number of rows
* @throws IOException
*/
public long countRows(String tableName, String... familyNames) throws IOException {
long rowCount = 0;
Configuration configuration = connection.getConfiguration();
// Increase RPC timeout, in case of a slow computation
configuration.setLong("hbase.rpc.timeout", 600000);
// Default is 1, set to a higher value for faster scanner.next(..)
configuration.setLong("hbase.client.scanner.caching", 1000);
AggregationClient aggregationClient = new AggregationClient(configuration);
try {
Scan scan = new Scan();
if (familyNames != null && familyNames.length > 0) {
for (String familyName : familyNames) {
scan.addFamily(Bytes.toBytes(familyName));
}
}
rowCount = aggregationClient.rowCount(TableName.valueOf(tableName), new LongColumnInterpreter(), scan);
} catch (Throwable e) {
throw new IOException(e);
}
return rowCount;
}
你可以试试 hbase api 方法!
org.apache.hadoop.hbase.client.coprocessor.AggregationClient