0

如何在 CPU 内核上执行函数,并在完成后获得回调?


语境

我正在接收一个流:

symbols = ['ABC', 'DFG', ...]  # 52 of these

handlers = { symbol: Handler(symbol) for symbol in symbols }

async for symbol, payload in lines:  # 600M of these
    handlers[symbol].feed(payload)

我需要使用多个 CPU 内核来加速它。

handler['ABC'](eg) 持有状态,但它与 (eg) 的状态不相交handler['DFG']

基本上我不能同时运行 2 个内核,例如handler['ABC'].


到目前为止我的方法

我提出了以下解决方案,但它是部分伪代码,因为我看不到如何实现它。

NCORES = 4
symbol_curr_active_on_core = [None]*NCORES

NO_CORES_FREE = -1
def first_free_core():
    for i, symbol in enumerate(symbol_curr_active_on_core):
        if not symbol:
            return i
    return NO_CORES_FREE

for symbol, payload in lines:
    # wait for avail core to handle it

    while True:
        sleep(0.001)
        if first_free_core() == NO_CORES_FREE:
            continue
        if symbol in symbol_curr_active_on_core:
            continue
        core = first_free_core()
        symbol_curr_active_on_core[core] = symbol

        cores[core].execute(
            processor[symbol].feed(payload),
            on_complete=lambda core_index: \
                symbol_curr_active_on_core[core_index] = None
        )

所以我的问题是:如何将最后一条语句转换为工作 Python 代码?

        cores[core].execute(
            processor[symbol].feed(payload),
            on_complete=lambda core_index: \
                symbol_curr_active_on_core[core_index] = None
        )

PS更一般地说,我的方法是最优的吗?

4

2 回答 2

2

假设以下方法应该是可行的:

  1. 您的Handler课程可以“腌制”并且
  2. 该类Handler没有携带太多的状态信息,以致于使其与每个工人调用的序列化成本过高。

主进程创建一个handlers字典,其中键是 52 个符号之一,值是具有两个键的字典:“handler”,其值是符号的处理程序,“处理”,其值是TrueFalse根据进程是否当前正在处理该符号的一个或多个有效负载。

池中的每个进程都使用另一个queue_dict字典初始化,其键是 52 个符号之一,其值是一个multiprocessing.Queue实例,该实例将保存要为该符号处理的有效负载实例。

主进程迭代输入的每一行以获得下一个符号/有效负载对。有效负载排队到当前符号的适当队列中。通过检查当前符号的标志,访问该handlers字典以确定是否已将任务排入处理池以处理当前符号的符号特定处理程序processing。如果此标志为True,则无需进一步操作。否则,该processing标志被设置为True并被apply_async调用,并将该符号的处理程序作为参数传递。

每次主任务将有效负载写入 52 个处理程序队列之一时,都会维护入队任务(即有效负载)的计数并递增。指定为参数的工作函数apply_async接受其处理程序参数,并从中推断出需要处理的队列。对于它在队列中找到的每个有效负载,它都会调用处理程序的feed方法。然后它返回一个元组,该元组由更新的处理程序和从队列中删除的有效负载消息的数量组成。apply_async方法 (1)的回调函数更新字典中的处理程序并 (2)将相应符号handlers的标志重置为processingFalse. 最后,它通过已删除的有效负载消息的数量来减少排队任务的数量。

当主进程在对有效负载进行排队后检查当前是否有进程正在运行此符号的处理程序并看到processing标志是True并且在此基础上没有通过 提交新任务时apply_async,有一个小窗口,该工作人员有已经完成处理其队列中的所有有效负载并且即将返回或已经返回并且回调函数尚未将processing标志设置为False. 在这种情况下,有效负载将在队列中未处理,直到从输入中读取该符号的下一个有效负载并进行处理。但是,如果该符号没有进一步的输入行,那么当所有任务都完成后,我们将拥有未处理的有效负载。但是我们也会有一个非零的入队任务计数,这表明我们遇到了这种情况。因此,与其尝试实现复杂的多处理同步协议,不如通过重新创建一个新池并检查 52 个队列中的每一个来检测这种情况并进行处理,这样更简单。

from multiprocessing import Pool, Queue
import time
from queue import Empty
from threading import Lock

# This class needs to be Pickle-able:
class Handler:
    def __init__(self, symbol):
        self.symbol = symbol
        self.counter = 0

    def feed(self, payload):
        # For testing just increment counter by payload:
        self.counter += payload


def init_pool(the_queue_dict):
    global queue_dict
    queue_dict = the_queue_dict


def worker(handler):
    symbol = handler.symbol
    q = queue_dict[symbol]
    tasks_removed = 0
    while True:
        try:
            payload = q.get_nowait()
            handler.feed(payload)
            tasks_removed += 1
        except Empty:
            break
    # return updated handler:
    return handler, tasks_removed

def callback_result(result):
    global queued_tasks
    global lock

    handler, tasks_removed = result
    # show done processing this symbol by updating handler state:
    d = handlers[handler.symbol]
    # The order of the next two statements matter:
    d['handler'] = handler
    d['processing'] = False
    with lock:
        queued_tasks -= tasks_removed

def main():
    global handlers
    global lock
    global queued_tasks

    symbols = [
        'A','B','C','D','E','F','G','H','I','J','K','L','M','AA','BB','CC','DD','EE','FF','GG','HH','II','JJ','KK','LL','MM',
        'a','b','c','d','e','f','g','h','i','j','k','l','m','aa','bb','cc','dd','ee','ff','gg','hh','ii','jj','kk','ll','mm'
    ]

    queue_dict = {symbol: Queue() for symbol in symbols}

    handlers = {symbol: {'processing': False, 'handler': Handler(symbol)} for symbol in symbols}

    lines = [
        ('A',1),('B',1),('C',1),('D',1),('E',1),('F',1),('G',1),('H',1),('I',1),('J',1),('K',1),('L',1),('M',1),
        ('AA',1),('BB',1),('CC',1),('DD',1),('EE',1),('FF',1),('GG',1),('HH',1),('II',1),('JJ',1),('KK',1),('LL',1),('MM',1),
        ('a',1),('b',1),('c',1),('d',1),('e',1),('f',1),('g',1),('h',1),('i',1),('j',1),('k',1),('l',1),('m',1),
        ('aa',1),('bb',1),('cc',1),('dd',1),('ee',1),('ff',1),('gg',1),('hh',1),('ii',1),('jj',1),('kk',1),('ll',1),('mm',1)
    ]


    def get_lines():
        # Emulate 52_000 lines:
        for _ in range(10_000):
            for line in lines:
                yield line

    POOL_SIZE = 4

    queued_tasks = 0
    lock = Lock()

    # Create pool of POOL_SIZE processes:
    pool = Pool(POOL_SIZE, initializer=init_pool, initargs=(queue_dict,))
    for symbol, payload in get_lines():
        # Put some limit on memory utilization:
        while queued_tasks > 10_000:
            time.sleep(.001)
        d = handlers[symbol]
        q = queue_dict[symbol]
        q.put(payload)
        with lock:
            queued_tasks += 1
        if not d['processing']:
            d['processing'] = True
            handler = d['handler']
            pool.apply_async(worker, args=(handler,), callback=callback_result)
    # Wait for all tasks to complete
    pool.close()
    pool.join()

    if queued_tasks:
        # Re-create pool:
        pool = Pool(POOL_SIZE, initializer=init_pool, initargs=(queue_dict,))
        for d in handlers.values():
            handler = d['handler']
            d['processing'] = True
            pool.apply_async(worker, args=(handler,), callback=callback_result)
        pool.close()
        pool.join()
        assert queued_tasks == 0

    # Print results:
    for d in handlers.values():
        handler = d['handler']
        print(handler.symbol, handler.counter)


if __name__ == "__main__":
    main()

印刷:

A 10000
B 10000
C 10000
D 10000
E 10000
F 10000
G 10000
H 10000
I 10000
J 10000
K 10000
L 10000
M 10000
AA 10000
BB 10000
CC 10000
DD 10000
EE 10000
FF 10000
GG 10000
HH 10000
II 10000
JJ 10000
KK 10000
LL 10000
MM 10000
a 10000
b 10000
c 10000
d 10000
e 10000
f 10000
g 10000
h 10000
i 10000
j 10000
k 10000
l 10000
m 10000
aa 10000
bb 10000
cc 10000
dd 10000
ee 10000
ff 10000
gg 10000
hh 10000
ii 10000
jj 10000
kk 10000
ll 10000
mm 10000
于 2021-07-25T13:22:03.340 回答
1

这远非唯一(甚至可能是“最佳”)方法,但根据我对您的另一篇文章的评论,这是一个让特定子进程处理特定“符号”的示例

from multiprocessing import Process, Queue
from queue import Empty
from math import ceil

class STOPFLAG: pass

class Handler:
    def __init__(self, symbol):
        self.counter = 0 #maintain some state for each "Handler"
        self.symbol = symbol

    def feed(self, payload):
        self.counter += payload
        return self.counter

class Worker(Process):
    def __init__(self, out_q):
        self.handlers = {}
        self.in_q = Queue()
        self.out_q = out_q
        super().__init__()

    def run(self):
        while True:
            try:
                symbol = self.in_q.get(1)
            except Empty:
                pass #put break here if you always expect symbols to be available and a timeout "shouldn't" happen
            else:
                if isinstance(symbol, STOPFLAG):
                    #pass back the handlers with their now modified state
                    self.out_q.put(self.handlers)
                    break
                else:
                    self.handlers[symbol[0]].feed(symbol[1])
def main():
    n_workers = 4
    # Just 8 for testing:
    symbols = ['ABC', 'DEF', 'GHI', 'JKL', 'MNO', 'PQR', 'STU', 'VWX']

    workers = []
    out_q = Queue()
    for i in range(n_workers):
        workers.append(Worker(out_q))
    symbol_worker_mapping = {}
    for i, symbol in enumerate(symbols):
        workers[i%n_workers].handlers[symbol] = Handler(symbol)
        symbol_worker_mapping[symbol] = i%n_workers

    for worker in workers: worker.start() #start processes

    # Just a few for testing:
    lines = [
        ('ABC', 1),
        ('DEF', 1),
        ('GHI', 1),
        ('JKL', 1),
        ('MNO', 1),
        ('PQR', 1),
        ('STU', 1),
        ('VWX', 1),
        ('ABC', 1),
        ('DEF', 1),
        ('GHI', 1),
        ('JKL', 1),
        ('MNO', 1),
        ('PQR', 1),
        ('STU', 1),
        ('VWX', 1),
    ]
    #putting this loop in a thread could allow results to be collected while inputs are still being fed in.
    for symbol, payload in lines: #feed in tasks
        worker = workers[symbol_worker_mapping[symbol]] #select the correct worker
        worker.in_q.put([symbol, payload]) #pass the inputs

    results = [] #results are handler dicts from each worker
    for worker in workers:
        worker.in_q.put(STOPFLAG()) #Send stop signal to each worker
        results.append(out_q.get()) #get results (may be out of order)

    for worker in workers: worker.join() #cleanup
    for result in results:
        for symbol, handler in result.items():
            print(symbol, handler.counter)


if __name__ == "__main__":
    main()

每个子进程处理“符号”的一个子集,并且每个子进程都有自己的输入队列。这与每个孩子都相同的正常情况不同pool,它们都共享一个输入队列,下一个可用的孩子总是接受下一个输入。然后他们都将结果放到一个共享的输出队列中,然后返回到主进程。

一个完全不同的解决方案可能是在主进程中保持所有状态,为每个符号维护一个锁,并在必要的状态被发送给工作人员直到收到结果时保持锁,并且主进程中的状态是更新。

于 2021-07-23T20:25:05.183 回答