我编写了一个 python 脚本,该脚本将从 Amazon SQS 中读取,并根据用户的需要创建尽可能多的并行进程。它继承了 Django BaseCommand,这就是代码。
def handle(self, *args, **kwargs):
self.set_up(*args, **kwargs)
process_queue = JoinableQueue(self.threads)
process_pool = Pool(
self.threads,
self.worker_process,
(process_queue,)
)
is_queue_empty = False
while not is_queue_empty:
message = self.get_next_message()
if len(message) == 0:
is_queue_empty = True
else:
process_queue.put(message[0])
process_queue.join()
raise CommandError('Number retries exceeded retry limit')
def worker_process(self, process_queue):
while True:
message = process_queue.get(True)
message_tuple = (message)
self.process_message(message_tuple)
process_queue.task_done()
这工作正常,一旦任务完成,所有进程都会被杀死。但不适用于一项特定的活动,我使用锅炉管来提取一些数据。
from boilerpipe.extract import Extractor
extractor = Extractor(extractor='DefaultExtractor', html=soup_html)
extractor.getText()
当我查看boilepipe代码时,我可以看到,在Extractor的构造函数中有这段代码,
lock = threading.Lock()
class Extractor():
def __init__():
# code
try:
# code
lock.acquire()
# code
finally:
lock.release()
完整的代码是这个
- 为什么进程没有被杀死,我的多处理方式有问题吗?
- 还是这个线程锁定正在造成问题(我不确定,只是在考虑所有可能的问题)。
请指教,先谢谢了。