我不知道您是如何使用Pool.map
的,但请注意,这Pool.map
不适用于大量输入。如您所见,在 Python 3.6 中,它在Lib/multiprocessing/pool.pyiterable
中实现,它声明它需要一个作为第一个参数,但实现确实在运行多进程映射之前消耗了整个迭代。因此,我认为Pool.map
如果您需要处理大量数据,则不需要使用它。也许Pool.imap
并且Pool.imap_unordered
可以工作。
关于你的实际问题。我有一个不涉及Pool.map
并且工作方式类似于multiprocessforeach
的解决方案。
首先需要继承Pool
并创建一个工作进程:
from multiprocessing import cpu_count
from multiprocessing import Queue
from multiprocessing import Process
class Worker(Process):
english = spacy.load('en')
def __init__(self, queue):
super(Worker, self).__init__()
self.queue = queue
def run(self):
for args in iter(self.queue.get, None):
# process args here, you can use self.
您像这样准备进程池:
queue = Queue()
workers = list()
for _ in range(cpu_count()): # minus one if the main processus is CPU intensive
worker = Worker(queue)
workers.append(worker)
worker.start()
然后你可以通过以下方式喂池queue
:
for args in iterable:
queue.put(args)
iterable
是您传递给工作人员的参数列表。上面的代码将iterable
尽可能快地推送内容。基本上,如果工人足够慢,几乎所有的迭代都会在工人完成工作之前被推送到队列中。这就是为什么iterable 的内容必须适合 memory的原因。
如果工人参数(又名。iterable
)不能放入内存中,您必须以某种方式同步主进程和工人......
最后确保调用以下命令:
for worker in workers:
queue.put(None)
for worker in workers:
worker.join()