1

我有一个包含 120 个表的列表,我想将每个表中前 1000 行和最后 1000 行的样本大小保存到每个表的单独 csv 文件中。

如何在代码仓库或代码创作中做到这一点。

以下代码允许将一个表保存到 csv,任何人都可以帮助循环遍历项目文件夹中的表列表并为每个表创建单独的 csv 文件吗?

@transform(
    my_input = Input('/path/to/input/dataset'),
    my_output = Output('/path/to/output/dataset')
)
def compute_function(my_input, my_output):
    my_output.write_dataframe(
        my_input.dataframe(),
        output_format = "csv",
        options = {
            "compression": "gzip"
        }
    )

伪代码

list_of_tables = [table1,table2,table3,...table120]
for tables in list_of_tables:
    table = table.limit(1000)
    table.write_dataframe(table.dataframe(),output_format = "csv",
        options = {
            "compression": "gzip"
        })

我能够让它在一张桌子上工作,我怎样才能遍历一张桌子列表并生成它?一张表的代码

# to get the first and last rows 
from transforms.api import transform_df, Input, Output 
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col


table_name = 'stock'
@transform_df(
    output=Output(f"foundry/sample/{table_name}_sample"),
    my_input=Input(f"foundry/input/{table_name}"),
)
def compute_first_last_1000(my_input):
    first_stock_df = my_input.withColumn("index", monotonically_increasing_id())
    first_stock_df = first_stock_df.orderBy("index").filter(col("index") < 1000).drop("index")
    last_stock_df = my_input.withColumn("index", monotonically_increasing_id())
    last_stock_df = last_stock_df.orderBy("index").filter(col("index") < 1000).drop("index")
    stock_df = first_stock_df.unionByName(last_stock_df)
    return stock_df

# code to save as csv file
table_name = 'stock'

@transform(
        output=Output(f"foundry/sample/{table_name}_sample_csv"),
        my_input=Input(f"foundry/sample/{table_name}_sample"),
)

def my_compute_function(my_input, output):
    df = my_input.dataframe()
    with output.filesystem().open('stock.csv', 'w') as stream:
        csv_writer = csv.writer(stream)
        csv_writer.writerow(df.schema.names)
        csv_writer.writerows(df.collect())
4

2 回答 2

1

如果您需要读取表名而不是对其进行硬编码,则可以使用os.listdiroros.walk方法。

我认为先前的答案遗漏了仅导出第一行和最后 N 行的部分。如果将表转换为数据框df,则

dfoutput = df.head(N).append(df.tail(N)])

或者

dfoutput = df[:N].append(df[-N:])
于 2021-11-15T04:48:02.503 回答
1

您最好的策略是以编程方式生成转换,如果您不想创建 1000 个转换,也可以进行多输出转换。像这样的东西(写在答案框中,未经测试的代码可能是错误的):

# you can generate this programatically
my_inputs = [
   '/path/to/input/dataset1',
   '/path/to/input/dataset2',
   '/path/to/input/dataset3',
   # ...
]

for table_path in my_inputs:
   @transform_df(
      Output(table_path + '_out'),
      df=Input(table_path))
   def transform(df):
       # your logic here
       return df

于 2021-11-16T13:58:06.687 回答