6

我有一个关于 Python 多处理的问题。我正在尝试获取数据集,分成块,然后将这些块传递给同时运行的进程。我需要使用简单的计算来转换大型数据表(例如,电阻 -> 热敏电阻的温度)。

下面列出的代码几乎可以按预期工作,但它似乎没有产生任何新进程(或者如果一次只有一个)。

from multiprocessing import Process

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3
    
    def __init__(self, list):
        self.data = list
        self.result = map(self.process, self.data)
        super(Worker, self).__init__()
        
if __name__ == '__main__':
    start = datetime.datetime.now()
    dataset = range(10000) # null dataset
    processes = 3
    
    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))

        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        tmp = dataset[i * chunk : (i + 1) * chunk + remainder]
        exec('worker'+str(i)+' = Worker(tmp)')
        exec('worker'+str(i)+'.start()')
        
    for i in range(processes):
        exec('worker'+str(i)+'.join()')
        # just a placeholder to make sure the initial values of the set are as expected
        exec('print worker'+str(i)+'.result[0]')
4

3 回答 3

1

您还没有覆盖该run方法。进程(或线程)有两种方法可以让它执行代码:

  1. 创建进程指定目标
  2. 子类化过程,覆盖run方法。

压倒__init__一切只是意味着您的流程已经装满无处可去。它应该被用来赋予它执行它需要执行的任务所需的属性,但它不应该指定要执行的任务。

在您的代码中,所有繁重的工作都在这一行中完成:

exec('worker'+str(i)+' = Worker(tmp)')

这里什么也没做:

exec('worker'+str(i)+'.start()')

所以检查结果exec('print worker'+str(i)+'.result[0]')应该会给你一些有意义的东西,但这只是因为你想要执行的代码已经执行了,但是在进程构建时,而不是在进程启动时。

试试这个:

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3

    def __init__(self, list):
        self.data = list
        self.result = []
        super(Worker, self).__init__()

    def run(self):
        self.result = map(self.process, self.data)

编辑:

好吧...所以我只是根据我的穿线直觉在这里飞行,他们都错了。我们都对流程不了解的是,您不能直接共享变量。无论您传递给新进程以启动什么,都会被读取、复制并永远消失。除非您使用两种标准方法之一来共享数据:队列和管道。我已经尝试了一些尝试让您的代码正常工作,但到目前为止还没有运气。我认为这会让你走上正确的轨道。

于 2009-04-11T20:53:34.027 回答
1

无需向每个进程发送块数,只需使用 get_nowait() 并处理最终的 Queue.Empty 异常。每个进程都会获得不同数量的 CPU 时间,这应该让它们都处于忙碌状态。

import multiprocessing, Queue

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output):
        self.input = input
        self.output = output
        super(Worker, self).__init__()

    def run(self):
        try:
            while True:
                self.output.put(self.process(self.input.get_nowait()))
        except Queue.Empty:
            pass


if name == 'main':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)
    for i in range(processes):
        Worker(input, output).start()

    for i in range(len(dataset)):
        print output.get()
于 2009-04-13T05:20:54.220 回答
0

好的,所以看起来列表不是线程安全的,我已经开始使用队列(尽管它看起来要慢得多)。这段代码基本上完成了我想做的事情:

import math, multiprocessing

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output, chunksize):
        self.input = input
        self.output = output
        self.chunksize = chunksize
        super(Worker, self).__init__()

    def run(self):
        for x in range(self.chunksize):
            self.output.put(self.process(self.input.get()))


if __name__ == '__main__':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)

    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))
        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        Worker(input, output, chunk + remainder).start()

    for i in range(len(dataset)):
        print output.get()
于 2009-04-13T04:25:52.777 回答