1

我正在尝试使用 tokio 异步运行时在 Rust 中编写一个网络爬虫。我想异步获取/处理多个页面,但我也希望爬虫在到达末尾时停止(换句话说,如果没有什么可爬的)。到目前为止,我已经使用futures::future::try_join_all从我提供为Futures 的异步函数中获取集体结果,但这显然需要程序事先知道要抓取的总页数。例如:

async fn fetch(_url: String) -> Result<String, ()> {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;

    Ok(String::from("foo"))
}

#[tokio::main]
async fn main() {
    let search_url = "https://example.com/?page={page_num}";

    let futures = (1..=3)
        .map(|page_num| search_url.replace("{page_num}", &page_num.to_string()))
        .map(|url| fetch(url));

    let _ = futures::future::try_join_all(futures).await.unwrap();
}

锈游乐场

在这个简单的示例中,我必须知道要通过 ( 1..=3) 的总页数,然后才能实际获取它们。我想要的是,不提供任何范围并且有条件停止整个过程。(例如,如果 HTML 结果包含“未找到”)

我查看了futures::executor::block_on但我不确定它是否可以用于这项任务。

4

1 回答 1

1

以下是使用Streamand的大致方法.buffered()

use futures::{future, stream, StreamExt};

#[derive(Debug)]
struct Error;

async fn fetch_page(page: i32) -> Result<String, Error> {
    println!("fetching page: {}", page);

    // simulate loading pages
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    if page < 5 {
        // successfully got page
        Ok(String::from("foo"))
    } else {
        // page doesn't exist
        Err(Error)
    }
}

#[tokio::main]
async fn main() {
    let pages: Vec<String> = stream::iter(1..)
        .map(fetch_page)
        .buffered(10)
        .take_while(|page| future::ready(page.is_ok()))
        .map(|page| page.unwrap())
        .collect()
        .await;

    println!("pages: {:?}", pages);
}

我将main()详细介绍这些步骤:

运行上面的代码会打印出以下内容,表明它一次尝试 10 次,但只会返回到第一次失败:

fetching page: 1
fetching page: 2
fetching page: 3
fetching page: 4
fetching page: 5
fetching page: 6
fetching page: 7
fetching page: 8
fetching page: 9
fetching page: 10
pages: ["foo", "foo", "foo", "foo"]

这掩盖了一些不错的东西,比如处理非缺失页面错误或重试,但我希望这能给你一个良好的基础。在这些情况下,您可能会使用TryStreamExt专门处理Results 流的方法。

于 2021-09-25T01:00:11.037 回答