我必须提前道歉,因为这个问题很笼统,可能不够清楚。问题是:如何并行运行一个 Python 函数,该函数本身使用一个进程池来处理某些子任务并执行大量繁重的 I/O 操作?它甚至是一个有效的任务吗?
我将尝试提供更多信息。我有一个程序,比如说test_reduce()
,我需要并行运行。我尝试了几种方法来做到这一点(见下文),我似乎缺乏一些知识来理解为什么所有这些方法都失败了。
这个test_reduce()
过程做了很多事情。其中一些比其他更与问题相关(我在下面列出):
- 它使用
multiprocessing
模块(sic!),即一个pool.Pool
实例, - 它使用 MongoDB 连接,
- 它严重依赖
numpy
和scikit-learn
库, - 它使用回调和 lambda,
- 它使用
dill
库来腌制一些东西。
首先我尝试使用一个multiprocessing.dummy.Pool
(这似乎是一个线程池)。我不知道这个池有什么特别之处,为什么它是,嗯,“虚拟”;整个事情都奏效了,我得到了我的结果。问题是CPU负载。对于所有内核的并行化部分,test_reduce()
它是 100%;对于同步部分,大部分时间约为 40-50%。我不能说这种“并行”执行的整体速度有任何提高。
然后我尝试使用这个程序的multiprocessing.pool.Pool
实例来map
处理我的数据。它失败了:
File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
raise self._value
cPickle.PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
我猜测这cPickle
是罪魁祸首,并找到了pathos
使用更高级 pickler的库dill
。然而它也失败了:
File "/local/lib/python2.7/site-packages/dill/dill.py", line 199, in load
obj = pik.load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1083, in load_newobj
obj = cls.__new__(cls, *args)
TypeError: object.__new__(generator) is not safe, use generator.__new__()
现在,这个错误是我完全不明白的。当它在池中工作时,我的程序没有输出stdout
,所以很难猜测发生了什么。我唯一知道的是,test_reduce()
当不使用多处理时,它会成功运行。
那么,你将如何并行运行如此繁重和复杂的东西?