4

我有一段现有的 Python 代码,它在我的机器的内核上并行运行。它完成的工作基本上是打开一个输入文件,读取内容,执行一些相当繁重的数学运算,将结果写入输出文件,在 for 循环中获取下一个文件并再次执行。为了使这个在许多内核上并行,我使用Poolmultiprocessing库中的函数。举个简单的例子:

import multiprocessing
import time

data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)

def mp_worker((inputs, the_time)):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

def mp_handler():
    p = multiprocessing.Pool(8)
    p.map(mp_worker, data)

if __name__ == '__main__':
    mp_handler()

这个例子只是用来展示我是如何multiprocessing.Pool跨 8 个内核实现这个功能的。本质上mp_worker,我的代码中的函数要复杂得多,但你明白我的意思。

我开始意识到我正在处理的网络中有几台机器在 99% 的时间里都处于空闲状态。因此,我想知道是否有办法在这段代码中利用他们的核心以及我的本地核心。

在伪代码中,代码可能会变成这样:

def mp_handler():
    p = multiprocessing.Pool(servers=['local host', 192.168.0.1, 192.168.0.2], ncores=[8,8,4])
    p.map(mp_worker, data)

我现在可以在其中指定我的本地计算机和其他 IP 地址作为服务器以及我想在每台计算机上使用的内核数。

由于我网络上的其他机器归我所有,并且没有连接到 Internet,因此出于安全目的,我不会对使用 SSH 大惊小怪。

谷歌搜索我注意到pathosandscoop库可能可以帮助我解决这个问题。它看起来与真正吸引我pathos的库具有非常相似的命令。multiprocessing但是,在这两种情况下,我都找不到一个简单的示例来展示如何将本地并行作业转换为分布式并行作业。我渴望尽可能靠近multiprocessing图书馆的池/地图功能。

任何帮助或示例将不胜感激!

4

1 回答 1

3

来自的示例pathos非常类似于您的伪代码。

from pathos.parallel import stats
from pathos.parallel import ParallelPool as Pool
pool = Pool()

def host(id):
    import socket
    import time
    time.sleep(1.0)
    return "Rank: %d -- %s" % (id, socket.gethostname())


print "Evaluate 10 items on 2 cpus"
pool.ncpus = 2
pool.servers = ('localhost:5653',)
res5 = pool.map(host, range(10))
print pool
print '\n'.join(res5)
print stats()
print ''

上面,您可以在初始化实例时将ncpusand设置servers为关键字。Pool

结果如下所示:

Evaluate 10 items on 2 cpus
<pool ParallelPool(ncpus=2, servers=('localhost:5653',))>
Rank: 0 -- hilbert.local
Rank: 1 -- hilbert.local
Rank: 2 -- hilbert.local
Rank: 3 -- hilbert.local
Rank: 4 -- hilbert.local
Rank: 5 -- hilbert.local
Rank: 6 -- hilbert.local
Rank: 7 -- hilbert.local
Rank: 8 -- hilbert.local
Rank: 9 -- hilbert.local
Job execution statistics:
 job count | % of all jobs | job time sum | time per job | job server
        10 |        100.00 |      10.0459 |     1.004588 | local
Time elapsed since server creation 5.0402431488
0 active tasks, 2 cores

如果您有多个服务器,并且可能具有远程服务器,则只需向servers元组添加更多条目。所以这不是一个完美的例子,因为它没有准确地展示如何让服务器在另一台机器上运行。但是,这是一个很好的例子,如果您确实计划使用 ssh tunnel,您应该知道您不指向pathos远程机器,而是指向localhost隧道端口……并且连接到远程机器。

由于pathos使用ppft(这是 的一个分支pp),您可以查看pp有关如何设置远程服务器的示例。基本上,您可以使用 shell 脚本执行以下操作:

for i in $nodes
do
    ssh -f $i /home/username/bin/ppserver.py -p $portnum -w 2 -t 30 &
done

这里循环在接收到的节点(节点)上。对于每个节点,ssh -f使用一个命令以ppserver指定端口(-p)、两个工作人员(-w)和空闲 30 秒后超时(-t)启动 a。请参阅pp文档 ( http://www.parallelpython.com/content/view/15/30 )。使用pathos,您只需要启动 appserver并指定端口即可使其工作。然后,您将主机名和端口添加到server第一个代码块中的元组中。

但是,如果您不喜欢手动设置,请pathos提供设置 atunnel和 a 的脚本ppserver。使用脚本比手动执行要灵活一些,并且在出现问题时更难以诊断……但尽管如此……请参阅此处的脚本:https ://github.com/uqfoundation/pathos/tree/master/scripts .

于 2016-03-23T13:44:42.273 回答