1

我开始使用 pathos.multiprocessing 来消除在处理诸如 logging.logger 之类的重要对象时的一些麻烦。但是,我遇到了导入在新线程中不可用的问题。结果,我必须在那个新线程中“重新导入”这些模块。具体来说,由于以下错误,此案例将无法运行:

NameError: global name 'os' is not defined

e Found at: multiprocess.pool

result = True, func(*args, **kwds)

即使在内置的多处理模块下,以下代码也不起作用:

import os
import multiprocessing
from pathos.multiprocessing import ProcessPool

class SomethingDoer:
    '''Probably does some thing'''
    def doAllSomethings(self, allSomethings):
        cores = multiprocessing.cpu_count()-1
        pool = ProcessPool(nodes=cores)
        for something in allSomethings:
            someProcess = self.doSomething, something
            pool.apipe(*someProcess)

    def doSomething(self,something):
        pathString = os.path.join(r"\foo%i"%something,"bar.log")
        print pathString

但是,以下代码将适用于 pathos.multiprocessing

import multiprocessing
from pathos.multiprocessing import ProcessPool

class SomethingDoer:
    '''Probably does some thing'''
    def doAllSomethings(self,allSomethings):
        cores = multiprocessing.cpu_count()-1
        pool = ProcessPool(nodes=cores)
        for something in allSomethings:
            someProcess = self.doSomething, something
            pool.apipe(*someProcess)

    def doSomething(self,something):
        import os
        pathString = os.path.join(r"\foo%i"%something,"bar.log")
        print pathString

用下面的例子

import time
p = SomethingDoer()
p.doAllSomethings(range(3))
while 1:
    time.sleep(.1)

两段代码都应输出以下输出(显然不一定以相同的顺序):

\foo0\bar.log
\foo1\bar.log
\foo2\bar.log

我希望能够使用第一段代码的一些变体,因为这与代码库的其余部分一致。但是,非常感谢任何帮助。

4

1 回答 1

0

我是pathos作者。如果您更新您pathos的使用multiprocess(而不是processing,这是一个更旧的分支),您的代码应该按预期工作。

>>> import os
>>> import pathos.helpers as multiprocessing
>>> from pathos.multiprocessing import ProcessPool
>>> 
>>> class SomethingDoer:
...     '''Probably does some thing'''
...     def doAllSomethings(self, allSomethings):
...         cores = multiprocessing.cpu_count()-1
...         pool = ProcessPool(nodes=cores)
...         for something in allSomethings:
...             someProcess = self.doSomething, something
...             pool.apipe(*someProcess)
...     def doSomething(self,something):
...         pathString = os.path.join(r"\foo","bar.log")
... 
>>> p = SomethingDoer()
>>> p.doAllSomethings(range(3))
>>> p.doSomething(range(3))
>>> 

请注意,您可能想要获取最新版本的dill,它更好地支持全局变量的pickle-tracing。要修改dill(以及因此pathos)处理全局变量的方式,您可以这样做:

>>> import dill
>>> dill.settings['recurse'] = True

无法保证哪种设置最适合您的情况,但它为您提供了选择。

编辑: 响应您的更新。这有效:在这里您会注意到我正在执行“获取”而不是使池活着,并等待检索响应。

import os
import pathos.helpers as multiprocessing
from pathos.multiprocessing import ProcessPool

class SomethingDoer:
    '''Probably does some thing'''
    def doAllSomethings(self,allSomethings):
        cores = multiprocessing.cpu_count()-1
        pool = ProcessPool(cores)
        for something in allSomethings:
            someProcess = self.doSomething, something
            pool.apipe(*someProcess).get()
       #pool.clear()

    def doSomething(self,something):
        pathString = os.path.join(r"\foo%i"%something,"bar.log")
        print pathString


if __name__ == '__main__':

    import time
    p = SomethingDoer()
    print p.doAllSomethings(range(3))

另一种方法是不使用get,然后取消注释clear... 本质上破坏池。 pathos在单例中保留池,因此多次运行每次设置池的开销更少。

我得到的输出与您预测的不完全一样。

>$ python testme.py 
\foo0/bar.log
\foo1/bar.log
\foo2/bar.log
None

如果您发现这是意外行为——而且也不正确,请填写错误报告。哦,我建议的设置没有任何区别——也没有使用“新式”类SomethingDoer(object)

于 2015-06-30T17:27:18.037 回答