3

TL;DR
我想在启动时将数据集预加载到 Dask 分布式调度程序中。

背景
我以实时查询方式使用 Dask,数据集更小。因为它是实时的,所以工作人员可以相信调度程序总是有某些可用的数据集——即使在启动后立即可用,这一点很重要。工作人员始终将整个数据集保存在内存中。

传统上,我通过连接客户端、分散 df 并发布数据集来完成此操作:

df = dd.read_parquet('df.parq')
df = client.persist(df)
client.publish_dataset(flights=dfa)

但这留下了调度程序重新启动和数据集未加载的可能性。

我知道您可以使用--preload它在启动时执行脚本,如下所示:

dask-scheduler --preload=scheduler-startup.py

样板代码如下所示:

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
    def add_worker(self, scheduler=None, worker=None, **kwargs):
        print("Added a new worker at", worker)

def dask_setup(scheduler):
    plugin = MyPlugin()
    scheduler.add_plugin(plugin)

但是如何说服调度程序在不使用外部客户端的情况下加载我的数据集?

从理论上讲,我可能会放弃一个启动预填充客户端的子进程,但感觉不太理想:)

调度程序启动中的普通客户端
尝试在调度程序启动中作为客户端连接:

from distributed.diagnostics.plugin import SchedulerPlugin
from dask.distributed import Client

class MyPlugin(SchedulerPlugin):
    def add_worker(self, scheduler=None, worker=None, **kwargs):
        print("Added a new worker at", worker)

def dask_setup(scheduler):
    c = Client(scheduler.address)
    df = dd.read_parquet('df.parq')
    df = c.persist(df)
    c.publish_dataset(flights=dfa)

挂在上面,c = Client(scheduler.address)必须被强行杀死(kill -9

4

2 回答 2

0

您可能会考虑将客户端代码添加到在事件循环上运行的异步函数中。这将允许预加载脚本完成,让调度程序启动,然后运行您的客户端代码。您可能需要以下内容:

async def f(scheduler):
    client =  await Client(scheduler.address)
    df = dd.read_parquet(...)
    await client.publish_dataset(flights=df)

def dask_setup(scheduler):
    scheduler.loop.add_callback(f, scheduler)
于 2017-09-29T12:37:28.983 回答
0

@MRocklin 的回答让我走上了正确的道路,但我确实需要转到另一个线程:

from concurrent.futures import ThreadPoolExecutor

def load_dataset():
    client = Client('127.0.0.1:8786')
    df = dd.read_parquet(...)
    df = client.persist(df)
    client.publish_dataset(flights=df)

async def f(scheduler):
    executor = ThreadPoolExecutor(max_workers=1)
    executor.submit(load_dataset)

def dask_setup(scheduler):
    scheduler.loop.add_callback(f, scheduler)

缺点是它不会阻止工作人员在加载数据时进行连接,但我认为必须在工作人员方面进行管理(如果数据集不可用,请重试)

于 2017-10-02T15:34:39.593 回答