0

我在 Python 中做多线程,代码片段如下:

threads = [threading.Thread(target=process_data, args=(col_name)) for index, col_name in enumerate(df.dtypes)]
for t in threads:
    t.start()
for t in threads:
    t.join()

def process_data(column_name):
    max_value_df = spark.sql('SELECT MAX({column_name}) AS {column_name} FROM table'.format(column_name = column_name))
    time.sleep(1)
    max_value = max_value_df.first()[column_name]

该代码基本上是获取表中所有列的最大值。所以,我使用线程来获取每一列的最大值。因此,考虑我的表有 5 列:a、b、c、d、e,因此每个线程将处理其中一列。问题是对于某些列,我收到错误:ValueError: 'a' is not in list显示max_value_df其结果中没有特定的列名。这种行为是非常随机的。有时我完全没有错误,有时它是五列中的任何一列。请提出这里的问题以及我该如何解决。我尝试添加time.sleep(1),以便在线程返回结果之前等待,但问题仍然存在。谢谢。

4

1 回答 1

0

我认为当 max_value_df.first() 是列表时会引发 ValueError ,您可以检查此类型以找出 max_value_df.first() 返回列表的原因和时间

于 2021-02-18T09:38:45.107 回答