51

我正在尝试在多个文件上并行运行一些 python 代码。构造基本上是:

def process_file(filename, foo, bar, baz=biz):
    # do stuff that may fail and cause exception

if __name__ == '__main__':
    # setup code setting parameters foo, bar, and biz

    psize = multiprocessing.cpu_count()*2
    pool = multiprocessing.Pool(processes=psize)

    map(lambda x: pool.apply_async(process_file, (x, foo, bar), dict(baz=biz)), sys.argv[1:])
    pool.close()
    pool.join()

我以前使用 pool.map 做类似的事情并且效果很好,但我似乎不能在这里使用它,因为 pool.map 不允许(似乎)允许我传递额外的参数(并使用 lambda这样做是行不通的,因为 lambda 无法编组)。

所以现在我试图让事情直接使用 apply_async() 来工作。我的问题是代码似乎挂起并且永远不会退出。一些文件因异常而失败,但我不明白为什么会导致连接失败/挂起?有趣的是,如果没有一个文件因异常而失败,它确实会干净地退出。

我错过了什么?

编辑:当函数(以及工作人员)失败时,我看到这个异常:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 376, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 3 arguments (1 given)', <class 'subprocess.CalledProcessError'>, ())

如果我看到其中一个,进程父进程将永远挂起,永远不会收获孩子并退出。

4

3 回答 3

56

很抱歉回答我自己的问题,但我至少找到了一种解决方法,所以如果其他人有类似的问题,我想在这里发布。我会接受任何更好的答案。

我相信问题的根源是http://bugs.python.org/issue9400。这告诉我两件事:

  • 我没有疯,我真正想做的应该是有效的
  • 至少在 python2 中,如果不是不可能的话,将“异常”返回到父进程是非常困难的。简单的工作,但许多其他人没有。

就我而言,我的工作函数正在启动一个存在段错误的子进程。这返回了 CalledProcessError 异常,这是不可腌制的。出于某种原因,这使得父对象中的池对象出去吃午饭,而不是从对 join() 的调用中返回。

在我的特殊情况下,我不在乎异常是什么。最多我想记录它并继续前进。为此,我只需将我的顶级工作函数包装在 try/except 子句中。如果工作进程抛出任何异常,它会在尝试返回父进程之前被捕获,记录,然后工作进程正常退出,因为它不再尝试发送异常。见下文:

def process_file_wrapped(filenamen, foo, bar, baz=biz):
    try:
        process_file(filename, foo, bar, baz=biz)
    except:
        print('%s: %s' % (filename, traceback.format_exc()))

然后,我有我的初始映射函数调用 process_file_wrapped() 而不是原来的。现在我的代码按预期工作。

于 2013-03-09T21:51:48.867 回答
5

在需要腌制对象的情况下,您实际上可以使用functools.partial实例而不是 a 。从 Python 2.7(和 Python 3)开始,对象是可腌制的。lambdapartial

pool.map(functools.partial(process_file, x, foo, bar, baz=biz), sys.argv[1:])
于 2013-03-10T02:07:02.680 回答
3

pool.map对于它的价值,挂起时我有一个类似的错误(不一样) 。我的用例允许我使用pool.terminate来解决它(在改变东西之前确保你的也一样)。

我在打电话之前使用了 pool.mapterminate ,所以我知道一切都完成了,来自文档

map() 内置函数的并行等效项(尽管它仅支持一个可迭代参数)。它阻塞,直到结果准备好。

如果那是您的用例,这可能是修补它的一种方法。

于 2014-12-16T18:01:34.703 回答