我最近为我的研究编写了一个带有类的程序,并且我试图将它并行化。当我将 Python 2.7 的 multiprocessing.Process 与 JoinableQueue 和托管数据一起使用时,我的程序最终会因已失效的进程而挂起。
import multiprocessing as mp
import traceback
class Paramfit(object):
def __init__(self):
pass
def _calc_bond(self, index):
# Calculate data
def _calc_parallel(self, index):
self._calc_bond(index)
def run(self):
for ts, force in itertools.izip(self.coortrj, self.forcevec):
try:
consumers = [mp.Process(target=self._calc_parallel,
args=(force,)) for i in range(nprocs)]
for w in consumers:
w.start()
# Enqueue jobs
for i in range(self.totalsites):
self.tasks.put(i)
# Add a poison pill for each consumer
for i in range(nprocs):
self.tasks.put(None)
self.tasks.close()
self.tasks.join()
# for w in consumers:
# w.join()
except:
traceback.print_exc()
_calc_parallel 调用其他一些类方法。
我什至尝试使用 multiprocessing.Pool 为此目的使用http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods上其他地方的 copy_reg 选项。
import multiprocessing as mp
import traceback
class Paramfit(object):
def __init__(self):
pass
def _calc_bond(self, index):
# Calculate data
def _use_force(force):
# Calculate data
def _calc_parallel(self, index, force):
self._calc_bond(index)
self._use_force(force)
def run(self):
try:
pool = mp.Pool(processes=nprocs, maxtasksperchild=2)
args = itertools.izip(range(self.totalsites), itertools.repeat(force))
pool.map_async(self._calc_parallel, args)
pool.close()
pool.join()
except:
traceback.print_exc()
但是,pool.map_async 似乎没有调用 self._calc_parallel。我知道在这两种情况下(进程和池),我都忽略了一些东西,但我不太清楚是什么。我通常处理超过 40,000 个元素。
谢谢您的帮助。
更新
在阅读了其他几篇文章后,我也尝试了 pathos.multiprocessing。
import pathos.multiprocessing as mp
class Paramfit(object):
def __init__(self):
pass
def _calc_bond(self, index):
# Calculate data
def _use_force(force):
# Calculate data
def _calc_parallel(self, index, force):
self._calc_bond(index)
self._use_force(force)
def run(self):
try:
pool = mp.ProcessingPool(nprocs)
args = itertools.izip(range(self.totalsites), itertools.repeat(force))
pool.amap(lambda x: self._calc_parallel(*x), args)
except:
traceback.print_exc()
而且,与我之前的尝试一样,这似乎也可以在不调用该方法的情况下快速完成。
更新 2
我决定修改代码,将我的庞然大物类拆分成更小、更易于管理的组件。但是,如果我使用 pathos.multiprocessing,我会遇到与之前发布的不同的情况(请参阅链接)。我的新代码现在有一个可用于计算的对象,然后通过其方法返回一个值。
import itertools
import pandas as pd
import pathos.multiprocessing as mp
class ForceData(object):
def __init__(self, *args, **kwargs):
# Setup data
self.value = pd.DataFrame()
def calculateBondData(self, index):
# Calculation
return self.value
def calculateNonBondedData(self, index):
# Calculation
return self.value
def calculateAll(self, index):
# Because self.value is a pandas.DataFrame, changed internally
self.calculateBondData(index)
self.calculateNonBondedData(index)
return self.value
class ForceMatrix(object):
def __init__(self, *args, **kwargs):
# Initialize data
self._matrix = pd.DataFrame()
def map(self, data):
for value in data.get():
for i, j in itertools.product(value.index, repeat=2):
self._matrix.loc[[i], [j]] += value.values
def calculate(self, *args, **kwargs):
# Setup initial information.
fd = ForceData()
matrix = ForceMatrix()
pool = mp.ProcessingPool()
data = pool.amap(fd.calculateAll, range(x))
matrix.map(data, force)
return matrix
我认为这是一个单独的功能func(dataobj, force)
,但这似乎也无济于事。以目前的速度,我估计单个处理器上的完整计算需要 1000 多个小时,这对于应该更快的东西来说太长了。
更新 3 (4/30/15)
由于@MikeMcKerns 有用的见解,我可能已经确定了一个可能的解决方案。在 iMac(四核)或集群的 16 核节点上,我发现对于没有键的粗粒度(CG)系统,双倍itertools.imap
似乎是我最好的解决方案(1000 个 CG 站点)时钟每个轨迹帧大约 5.2 秒。当我进入一个包含一些键细节的系统(3000 个代表水的 CG 站点)时,我发现在 iMac 上(使用 1 个核心),itertools.imap
然后是pathos.ThreadingPool.uimap
(4 个线程)以大约 85 秒/帧的速度运行;如果我按照@MikeMcKerns 的评论中的建议尝试进程池(4 核 x 2)/线程池(4 线程),计算时间增加了 2.5 倍。在 16 核集群(32 pp/16 tp)上,这个 CG 系统运行速度也很慢(大约 160 秒/帧)。在 iMac(1 核/4 线程)上具有 42,778 个站点和众多键的 CG 系统可能会以大约 58 分钟/帧的速度运行。我还没有在集群的 16 核节点上测试这个大型系统,但我不确定是否使用进程池/线程池来进一步加快速度。
例子:
# For a CG system with no bond details
for i in range(nframes):
data1 = itertools.imap(func1, range(nsites))
data2 = itertools.imap(func2, data1)
for values in data2:
func3(values)
# For a system with bond details
import pathos.multiprocessing as mp
tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
data1 = itertools.imap(func1, range(nsites))
data2 = tpool.uimap(func2, data1)
for values in data2:
func3(values)
# Seems to be the slowest in the bunch on iMac and possibly on 16-cores of a node.
ppool = mp.ProcessingPool(mp.cpu_count() * 2)
tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
data1 = ppool.uimap(func1, range(nsites))
data2 = tpool.uimap(func2, data1)
for values in data2:
func3(values)
我怀疑系统越大,我从多处理中获得的好处就越多。我知道大型 CG 系统(42,778 个站点)大约需要 0.08 秒/站点,而 0.02 秒/站点(3000 个 CG 站点)或 0.05 秒/站点(1000 个站点无键)。
在我努力减少计算时间的过程中,我发现了可以减少一些计算的区域(例如,global
变量和算法更改),但是如果我可以通过全面的多线程处理进一步减少它,那就太好了。