3

我需要将非常大的元素提交dask.bag到非线程安全存储,即我需要类似的东西

for x in dbag:
    store.add(x)

我不能使用compute,因为袋子太大而无法放入内存。我需要更多类似的东西,distributed.as_completed但它适用于袋子,但distributed.as_completed事实并非如此。

4

1 回答 1

2

我可能会继续使用普通计算,但添加一个锁

def commit(x, lock=None):
    with lock:
        store.add(x)

b.map(commit, lock=my_lock)

您可以在哪里创建threading.Lock,或者multiprocessing.Lock取决于您正在执行的处理类型

如果您想使用 as_completed,您可以将您的包转换为期货并在其上使用 as_completed。

from distributed.client import futures_of, as_completed
b = b.persist()
futures = futures_of(b)

for future in as_completed(futures):
    for x in future.result():
        store.add(x)

您还可以转换为数据框,我相信它会更明智地进行迭代

df = b.to_dataframe(...)
for x in df.iteritems(...):
    ...
于 2017-12-19T13:44:41.420 回答