0

我有一个抓取大约六千个 url 的应用程序。为了最大限度地减少这项工作,我创建了一个 RecursiveTask,它消耗所有要抓取的 URL 的 ConcurrentLinkedQueue。它最多分成 50 个,如果 que 是空的,它会直接抓取它,但如果不是,它首先会创建一个自己的新实例并分叉它,然后它会抓取 50 的子集,然后它将加入分叉的任务。

现在我的问题来了,直到每个线程都工作了他的 50 个所有四个工作快速和同时工作。但是在两个停止工作并等待加入之后,只有另外两个正在工作并创建新的分叉和爬取页面。

为了可视化这一点,我计算了线程爬取 URL 的数量并让 JavaFX gui 显示它。

我错了什么,所以 ForkJoinFramewok 只使用了我的四个允许线程中的两个?我能做些什么来改变它?

这是我的任务计算方法:

    LOG.debug(
       Thread.currentThread().getId() + " Starting new Task with " 
          + urlsToCrawl.size() + " left."
    );
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
    for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++)
    {
        urlsToCrawlSubset.offer(urlsToCrawl.poll());
    }
    LOG.debug(
       Thread.currentThread().getId() + " Crated a Subset with " 
       + urlsToCrawlSubset.size() + "."
    );
    LOG.debug(
       Thread.currentThread().getId() 
       + " Now the Urls to crawl only left " + urlsToCrawl.size() + "."
    );

    if (urlsToCrawl.isEmpty())
    {
        LOG.debug(Thread.currentThread().getId() + " Crawling the subset.");
        crawlPage(urlsToCrawlSubset);
    }
    else
    {
        LOG.debug(
           Thread.currentThread().getId() 
              + " Creating a new Task and crawling the subset."
        );
        final AbstractUrlTask<T, D> otherTask = createNewOwnInstance();
        otherTask.fork();
        crawlPage(urlsToCrawlSubset);
        taskResults.addAll(otherTask.join());
    }
    return taskResults;

这是我的图表的快照: 在此处输入图像描述

Ps 如果我允许多达 80 个线程,它将使用它们,直到每个有 50 个 URL 被抓取,然后只使用两个。

如果您有兴趣,这里是完整的源代码:https ://github.com/mediathekview/MServer/tree/feature/cleanup

4

1 回答 1

0

我修好了它。我的错误是,我拆分然后工作了一小部分,然后等待而不是将其拆分为两半,然后与其余的另一半再次调用我自己,等等。

换句话说,在我分裂并直接工作之前,但正确的是分裂直到所有分裂然后开始工作。

这是我的代码现在的样子:

@Override
protected Set<T> compute()
{
    if (urlsToCrawl.size() <= config.getMaximumUrlsPerTask())
    {
        crawlPage(urlsToCrawl);
    }
    else
    {
        final AbstractUrlTask<T, D> rightTask = createNewOwnInstance(createSubSet(urlsToCrawl));
        final AbstractUrlTask<T, D> leftTask = createNewOwnInstance(urlsToCrawl);
        leftTask.fork();
        taskResults.addAll(rightTask.compute());
        taskResults.addAll(leftTask.join());
    }
    return taskResults;
}

private ConcurrentLinkedQueue<D> createSubSet(final ConcurrentLinkedQueue<D> aBaseQueue)
{
    final int halfSize = aBaseQueue.size() / 2;
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
    for (int i = 0; i < halfSize; i++)
    {
        urlsToCrawlSubset.offer(aBaseQueue.poll());
    }
    return urlsToCrawlSubset;
}
于 2017-09-17T01:10:38.163 回答