我假设你有一些函数is_header(line)
可以is_trailer(line)
分别告诉你该行是标题还是尾随。然后,这是您可以从 S3 流式传输文件并将其保存回来的方法。
import boto3
s3 = boto3.client("s3")
bucket = "mybucket"
key = "path/to/abc.txt"
new_key = "path/to/def.txt"
r = s3.get_object(Bucket=bucket, Key=key)
sb = r["StreamingBody"]
content = [line for line in sb.iter_lines() if not is_header(line) and not is_trailer(line)]
content = b"".join(content)
r = s3.put_object(Bucket=bucket, Key=new_key, Bytes=content)
流数据以避免内存不足错误
上面的代码假设整个文件可以放入内存,我假设它可以,因为它只有 2 GB。如果没有,您将需要使用Multipart Uploads。
这是使用TransferManager的一种方法
from typing import Optional
import boto3
from boto3.s3.transfer import TransferConfig
import botocore
MB = 1024*1024
class FileNoHeader:
"""Wrapper for a botocore StreamingBody to filter headers/trailers"""
def __init__(self, stream: botocore.response.StreamingBody):
self.stream = stream
self.first_line = True
self.line_generator = self.stream.iter_lines()
def read(self, size: Optional[int] = None) -> bytes:
"""Wrap StreamingBody.iter_lines to read line-by-line while making it look like a fileobj
Parameters
----------
size: int, optional
How much data to read. This is a minimum amount because we are using
StreamingBody.iter_lines to read the file line by line, we can only return
whole lines. If `None`, the default, read the entire file.
This parameter is for compatibilty with the read() method of a file-like object
"""
data = []
amt = 0
line = b""
while size is None or amt < size:
try:
line = next(self.line_generator)
except StopIteration:
if line:
amt += len(line)
data.append(line)
break
if self.is_header(line) or self.is_trailer(line):
line = b""
continue
amt += len(line)
data.append(line)
return b"\n".join(data)
def close(self):
"""Close the underlying StreamingBody"""
self.stream.close()
def is_header(self, line):
# TODO: implement your logic
# right now just skips the first line
if self.first_line:
self.first_line = False
return True
return self.first_line
def is_trailer(self, line):
# TODO: implement your logic
return False
## Usage
config = TransferConfig(multipart_chunksize=1*MB)
s3 = boto3.client("s3")
bucket = "mybucket"
key = "path/to/abc.txt"
new_key = "path/to/abc_no_header.txt"
r = s3.get_object(Bucket=bucket, Key=key)
streaming_body = r["Body"]
data_stream = FileNoHeader(streaming_body)
def tcback(bytes_transferred):
print(f"{bytes_transferred} bytes transferred")
s3.upload_fileobj(
data_stream,
bucket,
new_key,
Config=config,
Callback=tcback
)
侧边栏:AWS Lambda
如果您使用 AWS Lambda 函数,则最多可以拥有 10 GB 的内存。您可以在 AWS 控制台或使用API设置内存。这是boto3和AWS CLI v2的文档。