我有一个包含 120 个表的列表,我想将每个表中前 1000 行和最后 1000 行的样本大小保存到每个表的单独 csv 文件中。
如何在代码仓库或代码创作中做到这一点。
以下代码允许将一个表保存到 csv,任何人都可以帮助循环遍历项目文件夹中的表列表并为每个表创建单独的 csv 文件吗?
@transform(
my_input = Input('/path/to/input/dataset'),
my_output = Output('/path/to/output/dataset')
)
def compute_function(my_input, my_output):
my_output.write_dataframe(
my_input.dataframe(),
output_format = "csv",
options = {
"compression": "gzip"
}
)
伪代码
list_of_tables = [table1,table2,table3,...table120]
for tables in list_of_tables:
table = table.limit(1000)
table.write_dataframe(table.dataframe(),output_format = "csv",
options = {
"compression": "gzip"
})
我能够让它在一张桌子上工作,我怎样才能遍历一张桌子列表并生成它?一张表的代码
# to get the first and last rows
from transforms.api import transform_df, Input, Output
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col
table_name = 'stock'
@transform_df(
output=Output(f"foundry/sample/{table_name}_sample"),
my_input=Input(f"foundry/input/{table_name}"),
)
def compute_first_last_1000(my_input):
first_stock_df = my_input.withColumn("index", monotonically_increasing_id())
first_stock_df = first_stock_df.orderBy("index").filter(col("index") < 1000).drop("index")
last_stock_df = my_input.withColumn("index", monotonically_increasing_id())
last_stock_df = last_stock_df.orderBy("index").filter(col("index") < 1000).drop("index")
stock_df = first_stock_df.unionByName(last_stock_df)
return stock_df
# code to save as csv file
table_name = 'stock'
@transform(
output=Output(f"foundry/sample/{table_name}_sample_csv"),
my_input=Input(f"foundry/sample/{table_name}_sample"),
)
def my_compute_function(my_input, output):
df = my_input.dataframe()
with output.filesystem().open('stock.csv', 'w') as stream:
csv_writer = csv.writer(stream)
csv_writer.writerow(df.schema.names)
csv_writer.writerows(df.collect())