我正在对 dask 数据帧运行一组类似熊猫的转换,使用“分布式”设置,在我自己的机器上运行 - 所以使用与我的计算机的 8 个内核相对应的 8 个工作人员。
我有一个分布式客户端的默认设置:
from dask.distributed import Client
c = Client()
该过程使用少量数据(1000 条记录)成功运行,但是当我将其仅略微扩展到 7500 条记录时,我收到以下警告:
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:58103, threads: 1>>
Traceback (most recent call last):
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_psosx.py", line 348, in catch_zombie
yield
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_psosx.py", line 387, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
ProcessLookupError: [Errno 3] No such process
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/tornado/ioloop.py", line 1026, in _run
return self.callback()
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/nanny.py", line 245, in memory_monitor
memory = psutil.Process(self.process.pid).memory_info().rss
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_common.py", line 337, in wrapper
return fun(self)
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/__init__.py", line 1049, in memory_info
return self._proc.memory_info()
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_psosx.py", line 330, in wrapper
return fun(self, *args, **kwargs)
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_psosx.py", line 456, in memory_info
rawtuple = self._get_pidtaskinfo()
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_common.py", line 337, in wrapper
return fun(self)
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_psosx.py", line 387, in _get_pidtaskinfo
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/contextlib.py", line 99, in __exit__
self.gen.throw(type, value, traceback)
File "/Users/user1/anaconda3/envs/ldaenv/lib/python3.6/site-packages/psutil/_psosx.py", line 361, in catch_zombie
raise AccessDenied(proc.pid, proc._name)
psutil._exceptions.AccessDenied: psutil.AccessDenied (pid=17998)
当 dask 尝试再次启动计算块时,它会重复多次。在配置文件中指定的次数失败后,最终会出现 KilledWorker 错误,例如以下。我已经尝试过使用不同长度的数据,并且 KilledWorker 有时在融化任务上,有时在应用任务上。
KilledWorker Traceback (most recent call last)
<ipython-input-28-7ba288919b51> in <module>()
1 #Optional checkpoint to view output
2 with ProgressBar():
----> 3 output = aggdf.compute()
4 output.head()
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
133 dask.base.compute
134 """
--> 135 (result,) = compute(self, traverse=False, **kwargs)
136 return result
137
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
331 postcomputes = [a.__dask_postcompute__() if is_dask_collection(a)
332 else (None, a) for a in args]
--> 333 results = get(dsk, keys, **kwargs)
334 results_iter = iter(results)
335 return tuple(a if f is None else f(next(results_iter), *a)
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, **kwargs)
1997 secede()
1998 try:
-> 1999 results = self.gather(packed, asynchronous=asynchronous)
2000 finally:
2001 for f in futures.values():
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1435 return self.sync(self._gather, futures, errors=errors,
1436 direct=direct, local_worker=local_worker,
-> 1437 asynchronous=asynchronous)
1438
1439 @gen.coroutine
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
590 return future
591 else:
--> 592 return sync(self.loop, func, *args, **kwargs)
593
594 def __repr__(self):
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
252 e.wait(1000000)
253 if error[0]:
--> 254 six.reraise(*error[0])
255 else:
256 return result[0]
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/utils.py in f()
236 yield gen.moment
237 thread_state.asynchronous = True
--> 238 result[0] = yield make_coro()
239 except Exception as exc:
240 logger.exception(exc)
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/tornado/gen.py in run(self)
1053
1054 try:
-> 1055 value = future.result()
1056 except Exception:
1057 self.had_exception = True
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/tornado/concurrent.py in result(self, timeout)
236 if self._exc_info is not None:
237 try:
--> 238 raise_exc_info(self._exc_info)
239 finally:
240 self = None
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/tornado/util.py in raise_exc_info(exc_info)
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/tornado/gen.py in run(self)
1061 if exc_info is not None:
1062 try:
-> 1063 yielded = self.gen.throw(*exc_info)
1064 finally:
1065 # Break up a reference to itself
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1313 six.reraise(type(exception),
1314 exception,
-> 1315 traceback)
1316 if errors == 'skip':
1317 bad_keys.add(key)
~/anaconda3/envs/ldaenv/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
KilledWorker: ("('melt-b85b6b6b1aee5b5aaa8d24db1de65b8a', 0)", 'tcp://127.0.0.1:58108')
我对分布式或龙卷风包或正在创建和杀死进程的底层架构不是很熟悉 - 有没有人能指出我调试/解决这个问题的正确方向?
与此同时,我正在切换到多线程计算的默认 dask 数据帧行为,它可以成功处理大量数据。