4

我必须提前道歉,因为这个问题很笼统,可能不够清楚。问题是:如何并行运行一个 Python 函数,该函数本身使用一个进程池来处理某些子任务并执行大量繁重的 I/O 操作?它甚至是一个有效的任务吗?

我将尝试提供更多信息。我有一个程序,比如说test_reduce(),我需要并行运行。我尝试了几种方法来做到这一点(见下文),我似乎缺乏一些知识来理解为什么所有这些方法都失败了。

这个test_reduce()过程做了很多事情。其中一些比其他更与问题相关(我在下面列出):

  • 它使用multiprocessing模块(sic!),即一个pool.Pool实例,
  • 它使用 MongoDB 连接,
  • 它严重依赖numpyscikit-learn库,
  • 它使用回调和 lambda,
  • 它使用dill库来腌制一些东西。

首先我尝试使用一个multiprocessing.dummy.Pool(这似乎是一个线程池)。我不知道这个池有什么特别之处,为什么它是,嗯,“虚拟”;整个事情都奏效了,我得到了我的结果。问题是CPU负载。对于所有内核的并行化部分,test_reduce()它是 100%;对于同步部分,大部分时间约为 40-50%。我不能说这种“并行”执行的整体速度有任何提高。

然后我尝试使用这个程序的multiprocessing.pool.Pool实例来map处理我的数据。它失败了:

File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

我猜测这cPickle是罪魁祸首,并找到了pathos使用更高级 pickler的库dill。然而它也失败了:

File "/local/lib/python2.7/site-packages/dill/dill.py", line 199, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1083, in load_newobj
    obj = cls.__new__(cls, *args)
TypeError: object.__new__(generator) is not safe, use generator.__new__()

现在,这个错误是我完全不明白的。当它在池中工作时,我的程序没有输出stdout,所以很难猜测发生了什么。我唯一知道的是,test_reduce()当不使用多处理时,它会成功运行。

那么,你将如何并行运行如此繁重和复杂的东西?

4

1 回答 1

1

因此,感谢@MikeMcKerns 的回答,我找到了如何使用pathoslib 完成工作。我需要摆脱所有pymongo游标,它们(作为生成器)不能被dill; 这样做解决了问题,我设法并行运行我的代码。

于 2015-07-08T13:09:49.630 回答