我有 2 个进程:第一个在其上创建全局分布式客户端;第二个进程是一个网络爬虫,它应该获取全局客户端并向它提交任务,当一切都完成后,它会向另一个进程发送一条消息,告诉它他可以继续。
from dask.distributed import Client, as_completed
from multiprocessing import Process
from time import sleep
import zmq
def get(url) -> dict:
# downloads data from url
time.sleep(3)
return data
def save(data) -> None:
# saves data locally
time.sleep(3)
return None
def scraper(urls):
# global client
client = get_client()
# zeromq socket
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind('tcp://*:port')
while True:
for future, result in as_completed([client.submit(get, url=url) for url in urls], with_results=True):
save(data=result)
socket.send_string('All job is done for this minute, proceed.')
sleep(60)
if __name__ == '__main__':
client = Client()
s = Process(target=scraper, *args, **kwargs)
s.start()
问题是从刮板功能我可以获得全局客户端(如果我打印它,我可以正确看到它),但我不能向它提交任何类型的任务。控制台没有打印任何错误,它只是卡住了,什么也不做。我认为原因是刮板功能在单独的 multiprocessing.Process 上运行。
任何解决方案或解决方法?谢谢你。