1

我有 2 个进程:第一个在其上创建全局分布式客户端;第二个进程是一个网络爬虫,它应该获取全局客户端并向它提交任务,当一切都完成后,它会向另一个进程发送一条消息,告诉它他可以继续。

from dask.distributed import Client, as_completed
from multiprocessing import Process
from time import sleep
import zmq


def get(url) -> dict:
   # downloads data from url
   time.sleep(3)

   return data


def save(data) -> None:
    # saves data locally
    time.sleep(3)

    return None


def scraper(urls):
    # global client
    client = get_client()

    # zeromq socket
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind('tcp://*:port')

    while True:
        for future, result in as_completed([client.submit(get, url=url) for url in urls], with_results=True):
        save(data=result)
        socket.send_string('All job is done for this minute, proceed.')
        sleep(60)


if __name__ == '__main__':

    client = Client()

    s = Process(target=scraper, *args, **kwargs)
    s.start()

问题是从刮板功能我可以获得全局客户端(如果我打印它,我可以正确看到它),但我不能向它提交任何类型的任务。控制台没有打印任何错误,它只是卡住了,什么也不做。我认为原因是刮板功能在单独的 multiprocessing.Process 上运行。

任何解决方案或解决方法?谢谢你。

4

1 回答 1

0

dask 客户端保持与调度程序的打开连接。根据您的系统如何创建新进程,您可能会获得在新进程中没有任何用处的连接副本,或者无法完全传输客户端(它不可腌制)。

相反,您应该将连接信息发送到子进程

addr = c.scheduler_info()['address']

在目标函数中做

client = Client(addr)
于 2020-01-09T21:04:16.863 回答