1

我正在尝试trio使用asks. 我使用nursery 一次启动几个爬虫,并使用内存通道来维护要访问的url 列表。

每个爬虫都会接收该通道两端的克隆,因此它们可以获取一个 url(通过 receive_channel)、读取它、查找并添加要访问的新 url(通过 send_channel)。

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())


async def crawler(send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        content = await ...
        urls_found = ...
        for u in urls_found:
            await send_channel.send(u)  # I'm a producer too!

在这种情况下,消费者生产者。如何优雅地停止一切?

关闭一切的条件是:

  • 频道为空
  • 所有爬虫都卡在第一个 for 循环中,等待 url 出现在 receive_channel 中(这......不会再发生了)

async with send_channel我在里面试过,crawler()但找不到一个好方法。我还尝试找到一些不同的方法(一些内存通道绑定的工作池等),这里也没有运气。

4

2 回答 2

1

这里至少有两个问题。

首先是您关于在通道为空时停止的假设。由于您分配大小为 0 的内存通道,因此它将始终为空。如果爬虫准备好接收它,您只能交出一个 url。

这就产生了第二个问题。如果你发现的 url 比你分配的爬虫多,你的应用程序就会死锁。

原因是,由于您无法将所有找到的 url 交给爬虫,爬虫永远不会准备好接收要爬取的新 url,因为它被卡在等待另一个爬虫获取它的一个 url。

这会变得更糟,因为假设其他爬虫之一找到新的 url,它们也会被困在已经等待交出其 url 的爬虫后面,并且他们将永远无法获取正在等待的 url 之一处理。

文档的相关部分:

https://trio.readthedocs.io/en/stable/reference-core.html#buffering-in-channels

假设我们解决了这个问题,下一步该去哪里?

您可能需要保留所有访问过的 url 的列表(设置?),以确保您不会再次访问它们。

要真正弄清楚何时停止,而不是关闭通道,简单地取消托儿所可能要容易得多。

假设我们像这样修改主循环:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    active_workers = trio.CapacityLimiter(3) # Number of workers
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            while True:
                await trio.sleep(1) # Give the workers a chance to start up.
                if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
                    nursery.cancel_scope.cancel() # All done!

现在我们需要稍微修改爬虫,以便在活动时获取令牌。

async def crawler(active_workers, send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        with active_workers:
            content = await ...
            urls_found = ...
            for u in urls_found:
                await send_channel.send(u)  # I'm a producer too!

其他需要考虑的事情——

您可能想send_channel.send_noblock(u)在爬虫中使用。由于您有一个无界缓冲区,因此不会出现 WillBlock 异常,并且每次发送都没有检查点触发器的行为可能是可取的。这样您就可以肯定地知道,在其他任务有机会获取新 url 或父任务有机会检查工作是否完成之前,特定 url 已完全处理并且所有新 url 都已添加。

于 2020-12-15T16:16:10.903 回答
0

这是我尝试重新组织问题时提出的解决方案:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
 
    limit = trio.CapacityLimiter(3)

    async with send_channel:
        await send_channel.send(('https://start-url', send_channel.clone()))
    #HERE1

    async with trio.open_nursery() as nursery:
        async for url, send_channel in receive_channel:  #HERE3
            nursery.start(consumer, url, send_channel, limit)

async def crawler(url, send_channel, limit, task_status):
    async with limit, send_channel:
        content = await ...
        links = ...
        for link in links:
            await send_channel.send((link, send_channel.clone()))
    #HERE2

(我跳过了访问过的网址)

这里没有 3 个长寿命消费者,但只要有足够的工作量,最多有 3 个消费者。

在#HERE1,send_channel 被关闭(因为它被用作上下文管理器),唯一保持通道活动的是它的克隆,在该通道内。

在#HERE2,克隆也被关闭(因为上下文管理器)。如果通道为空,则该克隆是保持通道活动的最后一件事。通道死亡,for 循环结束 (#HERE3)。

除非找到 url,在这种情况下,它们被添加到通道中,以及更多的 send_channel 克隆,这将使通道保持足够长的时间来处理这些 url。

这和 Anders E. Andersen 的解决方案对我来说都感觉很老套:一个正在使用sleepand statistics(),另一个创建 send_channel 的克隆并将它们放入通道中......对我来说感觉就像 klein bottle 的软件实现。我可能会寻找其他一些方法。

于 2020-12-16T12:21:34.593 回答