TL;DR
我想在启动时将数据集预加载到 Dask 分布式调度程序中。
背景
我以实时查询方式使用 Dask,数据集更小。因为它是实时的,所以工作人员可以相信调度程序总是有某些可用的数据集——即使在启动后立即可用,这一点很重要。工作人员始终将整个数据集保存在内存中。
传统上,我通过连接客户端、分散 df 并发布数据集来完成此操作:
df = dd.read_parquet('df.parq')
df = client.persist(df)
client.publish_dataset(flights=dfa)
但这留下了调度程序重新启动和数据集未加载的可能性。
我知道您可以使用--preload
它在启动时执行脚本,如下所示:
dask-scheduler --preload=scheduler-startup.py
样板代码如下所示:
from distributed.diagnostics.plugin import SchedulerPlugin
class MyPlugin(SchedulerPlugin):
def add_worker(self, scheduler=None, worker=None, **kwargs):
print("Added a new worker at", worker)
def dask_setup(scheduler):
plugin = MyPlugin()
scheduler.add_plugin(plugin)
但是如何说服调度程序在不使用外部客户端的情况下加载我的数据集?
从理论上讲,我可能会放弃一个启动预填充客户端的子进程,但感觉不太理想:)
调度程序启动中的普通客户端
尝试在调度程序启动中作为客户端连接:
from distributed.diagnostics.plugin import SchedulerPlugin
from dask.distributed import Client
class MyPlugin(SchedulerPlugin):
def add_worker(self, scheduler=None, worker=None, **kwargs):
print("Added a new worker at", worker)
def dask_setup(scheduler):
c = Client(scheduler.address)
df = dd.read_parquet('df.parq')
df = c.persist(df)
c.publish_dataset(flights=dfa)
挂在上面,c = Client(scheduler.address)
必须被强行杀死(kill -9
)