0

我使用该类来调度跨多个内核pathos ProcessingPool的函数的并发执行。run_regex()该函数将正则表达式作为参数并计算匹配的列表条目。如果找到匹配项,它将匹配值放入result_queue.

result_queue据我了解,目前每个工作进程都会在其虚拟地址空间中创建一个本地副本。但是,我想将此 Queue 对象用作共享内存机制,以便从主进程访问所有匹配项。

问题:

  1. 有没有办法将 Queue 对象传递给 Pool 初始化程序,以便队列充当共享内存部分?
  2. 队列对象是否需要同步?
  3. 有没有更好的方法来解决这个问题?

代码片段

from multiprocessing import Lock, Queue
from pathos.multiprocessing import ProcessingPool

result_queue = Queue()
lock = Lock()
data = {}

def run_regex(self, expr):

for key, value in data.iteritems():
    matchStr = re.search(expr, key, re.I)
    if matchStr:
        lock.acquire()
        result_queue.put(key)
        lock.release()
        break

def check_path(self):

    pool = ProcessingPool()
    pool.map(run_regex, in_regex)
4

1 回答 1

1
  1. 是的,你可以看一下对象的初始化参数Pool
  2. Queue对象已经是 mp 安全的,因此无需保护它们。
  3. 您不需要 a从函数Queue返回值。run_regex您可以简单地key从函数中返回,结果将使其可用map

    def run_regex(expr):
        group = []
    
        for key, value in data.iteritems():
            match = re.search(expr, key, re.I)
            if match is not None:
                group.append(key)
    
        return group
    
    groups = pool.map(run_regex, in_regex)
    keys = [key for group in groups for key in group]
    

    或者

    keys = list(itertools.chain.from_iterable(groups))
    

    map返回按 分组的键run_regex。之后您可以轻松地展平列表。

于 2018-08-16T07:36:24.780 回答