2

The goal is to try to load a large amount of "bulked" jsons from s3. I found aiobotocore and felt urged to try in hope to get more efficiency and at the same time familiarise myself with asyncio. I gave it a shot, and it works but I know basically nada about asynchronous programming. Therefore, I was hoping for some improvements/comments. Maybe there are some kind souls out there that can spot some obvious mistakes.

The problem is that boto3 only supports one http request at a time. By utilising Threadpool I managed to get significant improvements, but I'm hoping for a more efficient way.

Here is the code:

Imports:

import os 
import asyncio
import aiobotocore
from itertools import chain
import json
from json.decoder import WHITESPACE

Some helper generator I found somewhere to return decoded jsons from string with multiple jsons.

def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
    '''helper for parsing individual jsons from string of jsons (stolen from somewhere)'''
    string = str(string_or_fp)

    decoder = cls(**kwargs)
    idx = WHITESPACE.match(string, 0).end()
    while idx < len(string):
        obj, end = decoder.raw_decode(string, idx)
        yield obj
        idx = WHITESPACE.match(string, end).end()

This function gets keys from an s3 bucket with a given prefix:

# Async stuff starts here
async def get_keys(loop, bucket, prefix):
    '''Get keys in bucket based on prefix'''

    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
        keys = []
        # list s3 objects using paginator
        paginator = client.get_paginator('list_objects')
        async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for c in result.get('Contents', []):
                keys.append(c['Key'])
        return keys

This function gets the content for a provided key. Untop of that it flattens the list of decoded content:

async def get_object(loop,bucket, key):
    '''Get json content from s3 object'''
    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:


        # get object from s3
        response = await client.get_object(Bucket=bucket, Key=key)
        async with response['Body'] as stream:
            content = await stream.read()    

    return list(iterload(content.decode()))       

Here is the main function which gathers the contents for all the found keys and flattens the list of contents.

async def go(loop, bucket, prefix):
    '''Returns list of dicts of object contents'''
    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:

        keys = await get_keys(loop, bucket, prefix)

        contents = await asyncio.gather(*[get_object(loop, bucket, k) for k in keys])     

        return list(chain.from_iterable(contents))

Finally, I run this and the result list of dicts ends up nicely in result

loop = asyncio.get_event_loop()
result = loop.run_until_complete(go(loop, 'some-bucket', 'some-prefix'))
  • One thing that I think might be a bit wierd is that I create a client in each async function. Probably that can be lifted out. Note sure about how aiobotocore works with multiple clients.

  • Furthermore, I think that you would not need to await that all keys are loaded before loading the objects for the keys, which I think is the case in this implementation. I'm assuming that as soon as a key is found you could call get_object. So, maybe it should be an async generator. But I'm not completely in the clear here.

Thank you in advance! Hope this helps someone in a similar situation.

4

1 回答 1

4

首先查看aioboto3

其次,aiobotocore 中的每个客户端都与一个 aiohttp 会话相关联。每个会话最多可以有max_pool_connections。这就是为什么在基本的 aiobotocore 示例中它async withcreate_client. 因此,使用客户端完成后,池将关闭。

以下是一些提示:

  1. 您应该使用 由我创建的、由 CaliDog 模块化的工作池,以避免污染您的事件循环。使用此方法时,请将您的工作流程视为流。
  2. 这将避免您必须使用 asyncio.gather,这将在引发第一个异常后让任务在后台运行。
  3. 您应该一起调整您的工作循环大小和 max_pool_connections,并且只使用一个客户端来并行支持您想要(或基于计算所需)支持的任务数量。
  4. 你真的不需要像现代 python 版本那样传递循环,每个线程只有一个循环
  5. 您应该使用 aws 配置文件(会话初始化的配置文件参数)/环境变量,因此您不需要硬编码密钥和区域信息。

基于以上内容,我将如何做到这一点:

import asyncio
from itertools import chain
import json
from typing import List
from json.decoder import WHITESPACE
import logging
from functools import partial

# Third Party
import asyncpool
import aiobotocore.session
import aiobotocore.config

_NUM_WORKERS = 50


def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
    # helper for parsing individual jsons from string of jsons (stolen from somewhere)
    string = str(string_or_fp)

    decoder = cls(**kwargs)
    idx = WHITESPACE.match(string, 0).end()
    while idx < len(string):
        obj, end = decoder.raw_decode(string, idx)
        yield obj
        idx = WHITESPACE.match(string, end).end()


async def get_object(s3_client, bucket: str, key: str):
    # Get json content from s3 object

    # get object from s3
    response = await s3_client.get_object(Bucket=bucket, Key=key)
    async with response['Body'] as stream:
        content = await stream.read()

    return list(iterload(content.decode()))


async def go(bucket: str, prefix: str) -> List[dict]:
    """
    Returns list of dicts of object contents

    :param bucket: s3 bucket
    :param prefix: s3 bucket prefix
    :return: list of dicts of object contents
    """
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger()

    session = aiobotocore.session.AioSession()
    config = aiobotocore.config.AioConfig(max_pool_connections=_NUM_WORKERS)
    contents = []
    async with session.create_client('s3', config=config) as client:
        worker_co = partial(get_object, client, bucket)
        async with asyncpool.AsyncPool(None, _NUM_WORKERS, 's3_work_queue', logger, worker_co,
                                       return_futures=True, raise_on_join=True, log_every_n=10) as work_pool:
            # list s3 objects using paginator
            paginator = client.get_paginator('list_objects')
            async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
                for c in result.get('Contents', []):
                    contents.append(await work_pool.push(c['Key']))

    # retrieve results from futures
    contents = [c.result() for c in contents]
    return list(chain.from_iterable(contents))


_loop = asyncio.get_event_loop()
_result = _loop.run_until_complete(go('some-bucket', 'some-prefix'))
于 2019-04-21T06:26:19.070 回答