我从文件中读取行,当然是在一个线程中。行是按键排序的。
然后我收集具有相同键的行(15-20行),进行解析,大计算等,并将结果对象推送到统计类。
我想并行我的程序以在一个线程中读取,在多个线程中进行解析和计算,并将结果连接到一个线程中以写入 stat 类。
java7框架中是否有针对此问题的现成模式或解决方案?
我通过执行器实现多线程、推送到blockingQueue以及在另一个线程中读取队列,但我认为我的代码很糟糕并且会产生错误
非常感谢
更新:
我无法映射内存中的所有文件 - 它非常大
我从文件中读取行,当然是在一个线程中。行是按键排序的。
然后我收集具有相同键的行(15-20行),进行解析,大计算等,并将结果对象推送到统计类。
我想并行我的程序以在一个线程中读取,在多个线程中进行解析和计算,并将结果连接到一个线程中以写入 stat 类。
java7框架中是否有针对此问题的现成模式或解决方案?
我通过执行器实现多线程、推送到blockingQueue以及在另一个线程中读取队列,但我认为我的代码很糟糕并且会产生错误
非常感谢
更新:
我无法映射内存中的所有文件 - 它非常大
您已经记住了主要的方法类别。CountDownLatch、Thread.join、Executors、Fork/Join。另一种选择是 Akka 框架,它的消息传递开销在 1-2 微秒内测量,并且是开源的。但是,让我分享另一种通常优于上述方法并且更简单的方法,这种方法源于许多公司在 Java 中处理批处理文件加载。
假设您拆分工作的目标是表现,而不是学习。以从开始到结束需要多长时间来衡量的性能。然后,通常很难使其比内存映射文件更快,并且在已固定到单个内核的单个线程中处理。它也提供了更简单的代码。双赢。
这可能与直觉相反,但是处理文件的速度几乎总是受到文件加载效率的限制。不是处理的并行程度。因此,内存映射文件是一个巨大的胜利。一旦内存映射,我们希望算法在执行文件加载时与硬件的争用较低。现代硬件倾向于将 IO 控制器和内存控制器与 CPU 放在同一个插槽上;当与 CPU 本身中的预取器结合使用时,当从单个线程以有序方式处理文件时,会带来非常高的效率。这可能非常极端,以至于并行运行实际上可能要慢得多。将线程固定到内核通常会将内存绑定算法的速度提高 5 倍。这就是内存映射部分如此重要的原因。
如果您还没有,请尝试一下。
没有事实和数字,很难给你建议。所以让我们从头开始:
话虽如此,您应该是正确的轨道之一。您可以从使用适当大小的任务的 Executor 开始。任务写入数据结构,如您的阻塞队列,在工作人员和“数据收集器”线程之间共享。这种线程模型非常简单、高效且不易出错。它通常足够有效。如果您仍然需要更好的性能,那么您必须分析您的应用程序并了解瓶颈。然后你可以决定要走的路:优化你的任务规模,使用像disruptor/Akka这样更快的工具,改进IO,创建更少的对象,调整你的代码,购买更大的机器或更快的磁盘,迁移到Hadoop等。固定每个线程到核心(需要特定于平台的代码)也可以提供显着的提升。
如果您在尝试时被要求拆分工作,我会这样做:
public class App {
public static class Statistics {
}
public static class StatisticsCalculator implements Callable<Statistics> {
private final List<String> lines;
public StatisticsCalculator(List<String> lines) {
this.lines = lines;
}
@Override
public Statistics call() throws Exception {
//do stuff with lines
return new Statistics();
}
}
public static void main(String[] args) {
final File file = new File("path/to/my/file");
final List<List<String>> partitionedWork = partitionWork(readLines(file), 10);
final List<Callable<Statistics>> callables = new LinkedList<>();
for (final List<String> work : partitionedWork) {
callables.add(new StatisticsCalculator(work));
}
final ExecutorService executorService = Executors.newFixedThreadPool(Math.min(partitionedWork.size(), 10));
final List<Future<Statistics>> futures;
try {
futures = executorService.invokeAll(callables);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
try {
for (final Future<Statistics> future : futures) {
final Statistics statistics = future.get();
//do whatever to aggregate the individual
}
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
static List<String> readLines(final File file) {
//read lines
return new ArrayList<>();
}
static List<List<String>> partitionWork(final List<String> lines, final int blockSize) {
//divide up the incoming list into a number of chunks
final List<List<String>> partitionedWork = new LinkedList<>();
for (int i = lines.size(); i > 0; i -= blockSize) {
int start = i > blockSize ? i - blockSize : 0;
partitionedWork.add(lines.subList(start, i));
}
return partitionedWork;
}
}
我创建了一个Statistics对象,它保存了完成工作的结果。
有一个StatisticsCalculator对象是 a Callable<Statistics>- 这会进行计算。它被赋予 aList<String>并处理行并创建Statistics.
readLines我留给你实施的方法。
在许多方面最重要的方法是partitionWork方法,它将List<String>文件中所有行的传入划分为List<List<String>>使用blockSize. 这基本上决定了每个线程应该有多少工作,blockSize参数的调整非常重要。好像每件作品只有一行,那么开销可能会超过优势,而如果每件作品有一万行,那么你只有一个工作Thread。
最后,操作的核心是main方法。这将调用读取然后分区方法。它产生一个ExecutorService线程数等于工作位数但最多 10 个的线程。您可以通过方法使其等于您拥有的内核数。
然后,该main方法将List所有Callables 中的一个(每个块一个)提交给executorService. 该invokeAll方法阻塞,直到工作完成。
该方法现在遍历每个返回List<Future>并获取每个生成的Statistics对象;准备聚合。
之后不要忘记关闭它,executorService因为它会阻止您的申请表退出。
编辑
OP想要逐行阅读所以这里是一个修改过的main
public static void main(String[] args) throws IOException {
final File file = new File("path/to/my/file");
final ExecutorService executorService = Executors.newFixedThreadPool(10);
final List<Future<Statistics>> futures = new LinkedList<>();
try (final BufferedReader reader = new BufferedReader(new FileReader(file))) {
List<String> tmp = new LinkedList<>();
String line = null;
while ((line = reader.readLine()) != null) {
tmp.add(line);
if (tmp.size() == 100) {
futures.add(executorService.submit(new StatisticsCalculator(tmp)));
tmp = new LinkedList<>();
}
}
if (!tmp.isEmpty()) {
futures.add(executorService.submit(new StatisticsCalculator(tmp)));
}
}
try {
for (final Future<Statistics> future : futures) {
final Statistics statistics = future.get();
//do whatever to aggregate the individual
}
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
这将逐行流式传输文件,并且在给定数量的行之后触发一个新任务以将行处理到执行程序。
clear完成后,您需要调用List<String>in ,Callable因为实例是它们返回Callable的 s 的引用。Future如果您在完成后清除Lists ,这应该会大大减少内存占用。
进一步的增强可能是使用这里的建议来阻塞直到有一个备用线程 - 这将保证如果您在 s 完成时清除s ,那么一次内存中的行ExecutorService数不会超过s 。threads*blocksizeListCallable
您可以使用 CountDownLatch http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html
同步线程的启动和加入。这比在线程集上循环并在每个线程引用上调用 join() 更好。