0

这是一个小代码片段:

import aiohttp
import aiofiles

async def fetch(url):
    # starting a session
    async with aiohttp.ClientSession() as session:
        # starting a get request
        async with session.get(url) as response:
            # getting response content
            content = await response.content
            return content
 
async def save_file(file_name, content):
    async with aiofiles.open(f'./binary/{file_name}', 'wb') as f:
      while True:
            chunk = content.read(1024)
            if not chunk:
                break
            f.write(chunk)

我正在尝试使用库下载一些二进制文件aiohttp,然后使用库将它们传递给协程aiofiles以将文件写入磁盘。我已经阅读了文档,但仍然无法确定我是否可以通过content = await response.content,或者当手柄关闭时它async with..是否关闭?因为在二级博客上,我发现:

根据 aiohttp 的文档,因为响应对象是在上下文管理器中创建的,所以它在技术上隐式调用 release()。

这让我感到困惑,我应该将第二个函数的逻辑嵌入到response句柄中还是我的逻辑正确?

4

2 回答 2

1

异步上下文管理器将关闭与请求相关的资源,因此如果您从函数返回,则必须确保您已阅读所有感兴趣的内容。所以你有两个选择:

  1. 将整个响应读入内存,例如content = await response.read(),如果文件不适合内存(以及如果您想通过并行读取和写入来加快速度),则使用 or
  2. 使用队列或异步迭代器来并行读取和写入。

这是#2的未经测试的实现:

async def fetch(url):
    # return an async generator over contents of URL
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            # getting response content in chunks no larger than 4K
            for chunk in response.content.iter_chunked(4096):
                yield chunk
 
async def save_file(file_name, content_iter):
    async with aiofiles.open(f'./binary/{file_name}', 'wb') as f:
        for chunk in content_iter:
            f.write(chunk)  # maybe you need to await this?

async def main():
    save_file(file_name, fetch(url))
于 2020-10-17T12:18:02.940 回答
1

感谢user4815162342的代码,我可以通过并行化获取和编写协程来找到解决方案。我会检查他的代码作为可接受的解决方案,但由于我必须添加一些代码才能使其工作,所以它是:

# fetch binary from server
async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            async for chunk in response.content.iter_chunked(4096):
                yield chunk

# write binary function
async def save_file(file_name, chunk_iter):
    list(map(create_dir_tree, list_binary_sub_dirs))
    async with aiofiles.open(f'./binary/bin_ts/{file_name}', 'wb') as f:
        async for chunk in chunk_iter:
            await f.write(chunk)
    

async def main(urls):
    tasks = []
    for url in urls:
        print('running on sublist')
        file_name = url.rpartition('/')[-1]
        request_ts = fetch(url)
        tasks.append(save_file(file_name, request_ts))
    await asyncio.gather(*tasks)

asyncio.run(main(some_list_of_urls))
于 2020-10-17T18:26:56.030 回答