我有一个抓取大约六千个 url 的应用程序。为了最大限度地减少这项工作,我创建了一个 RecursiveTask,它消耗所有要抓取的 URL 的 ConcurrentLinkedQueue。它最多分成 50 个,如果 que 是空的,它会直接抓取它,但如果不是,它首先会创建一个自己的新实例并分叉它,然后它会抓取 50 的子集,然后它将加入分叉的任务。
现在我的问题来了,直到每个线程都工作了他的 50 个所有四个工作快速和同时工作。但是在两个停止工作并等待加入之后,只有另外两个正在工作并创建新的分叉和爬取页面。
为了可视化这一点,我计算了线程爬取 URL 的数量并让 JavaFX gui 显示它。
我错了什么,所以 ForkJoinFramewok 只使用了我的四个允许线程中的两个?我能做些什么来改变它?
这是我的任务计算方法:
LOG.debug(
Thread.currentThread().getId() + " Starting new Task with "
+ urlsToCrawl.size() + " left."
);
final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++)
{
urlsToCrawlSubset.offer(urlsToCrawl.poll());
}
LOG.debug(
Thread.currentThread().getId() + " Crated a Subset with "
+ urlsToCrawlSubset.size() + "."
);
LOG.debug(
Thread.currentThread().getId()
+ " Now the Urls to crawl only left " + urlsToCrawl.size() + "."
);
if (urlsToCrawl.isEmpty())
{
LOG.debug(Thread.currentThread().getId() + " Crawling the subset.");
crawlPage(urlsToCrawlSubset);
}
else
{
LOG.debug(
Thread.currentThread().getId()
+ " Creating a new Task and crawling the subset."
);
final AbstractUrlTask<T, D> otherTask = createNewOwnInstance();
otherTask.fork();
crawlPage(urlsToCrawlSubset);
taskResults.addAll(otherTask.join());
}
return taskResults;
Ps 如果我允许多达 80 个线程,它将使用它们,直到每个有 50 个 URL 被抓取,然后只使用两个。
如果您有兴趣,这里是完整的源代码:https ://github.com/mediathekview/MServer/tree/feature/cleanup