从 python 代码调用 /triggering Luigi Task 时遇到问题。
- 基本上我需要像我们在命令行上一样触发一个 luigi 任务,但是来自 python 代码
- 我正在使用 supbrocess.popen 使用 shell 命令调用 luigi 任务
- 我有一个名为 test.py 的测试代码,并且在模块 task_scheduler.py 中有一个测试类,其中包含我的 luigi 任务(两个模块都在同一位置/目录中)
import luigi
class TestClass(luigi.Task):
# param = luigi.DictParameter(default=dict())
def requires(self):
print "I am TestClass req"
def run(self):
with open('myfile.txt', 'w') as f:
f.write("asasasas")
print "I am TestClass run"
import subprocess
p = subprocess.Popen("python -m luigi --module task_scheduler TestClass", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print p.pid
(output, err) = p.communicate()
print "-------------O/P-------------"
print output
print "-------------error-------------"
print err
但我得到的错误是
52688
-------------O/P-------------
-------------error-------------
ERROR: Uncaught exception in luigi
Traceback (most recent call last):
File "/Library/Python/2.7/site-packages/luigi/retcodes.py", line 61, in run_with_retcodes
worker = luigi.interface._run(argv)['worker']
File "/Library/Python/2.7/site-packages/luigi/interface.py", line 238, in _run
return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
File "/Library/Python/2.7/site-packages/luigi/interface.py", line 172, in _schedule_and_run
not(lock.acquire_for(env_params.lock_pid_dir, env_params.lock_size, kill_signal))):
File "/Library/Python/2.7/site-packages/luigi/lock.py", line 82, in acquire_for
my_pid, my_cmd, pid_file = get_info(pid_dir)
File "/Library/Python/2.7/site-packages/luigi/lock.py", line 67, in get_info
pid_file = os.path.join(pid_dir, hashlib.md5(cmd_hash).hexdigest()) + '.pid'
TypeError: must be string or buffer, not None
谁能建议我在这里做错了什么?如果我使用 shell 提示符,命令“python -m luigi --module task_scheduler TestClass”可以完美运行