我正在做一个项目,我需要从 S3 容器下载特定 URL 的爬网数据(来自 CommonCrawl),然后处理该数据。
目前我有一个 MapReduce 作业(通过 Hadoop Streaming 的 Python),它为 URL 列表获取正确的 S3 文件路径。然后我尝试使用第二个 MapReduce 作业通过从 commoncrawl S3 存储桶下载数据来处理此输出。在映射器中,我使用 boto3 从 commoncrawl S3 存储桶下载特定 URL 的 gzip 内容,然后输出有关 gzip 内容的一些信息(字数计数器信息、内容长度、链接到的 URL 等)。然后 reducer 会通过这个输出来获得最终的字数、URL 列表等。
第一个 MapReduce 作业的输出文件大小只有 6mb 左右(但一旦我们扩展到完整的数据集就会更大)。当我运行第二个 MapReduce 时,这个文件只被拆分了两次。通常,对于这么小的文件,这不是问题,但是我上面描述的映射器代码(获取 S3 数据,吐出映射的输出等)需要一段时间才能为每个 URL 运行。由于文件只拆分了两次,因此只有 2 个映射器正在运行。我需要增加拆分的数量,以便可以更快地完成映射。
我已经尝试为 MapReduce 作业设置“mapreduce.input.fileinputformat.split.maxsize”和“mapreduce.input.fileinputformat.split.minsize”,但它不会改变发生的拆分数量。
以下是映射器的一些代码:
s3 = boto3.client('s3', 'us-west-2', config=Config(signature_version=UNSIGNED))
offset_end = offset + length - 1
gz_file = s3.get_object(Bucket='commoncrawl', Key=filename, Range='bytes=%s-%s' % (offset, offset_end))[
'Body'].read()
fileobj = io.BytesIO(gz_file)
with gzip.open(fileobj, 'rb') as file:
[do stuff]
我还手动将输入文件拆分为多个文件,最多 100 行。这具有为我提供更多映射器的预期效果,但随后我开始遇到来自 s3client.get_object() 调用的 ConnectionError:
Traceback (most recent call last):
File "dmapper.py", line 103, in <module>
commoncrawl_reader(base_url, full_url, offset, length, warc_file)
File "dmapper.py", line 14, in commoncrawl_reader
gz_file = s3.get_object(Bucket='commoncrawl', Key=filename, Range='bytes=%s-%s' % (offset, offset_end))[
File "/usr/lib/python3.6/site-packages/botocore/client.py", line 314, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/lib/python3.6/site-packages/botocore/client.py", line 599, in _make_api_call
operation_model, request_dict)
File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 148, in make_request
return self._send_request(request_dict, operation_model)
File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 177, in _send_request
success_response, exception):
File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 273, in _needs_retry
caught_exception=caught_exception, request_dict=request_dict)
File "/usr/lib/python3.6/site-packages/botocore/hooks.py", line 227, in emit
return self._emit(event_name, kwargs)
File "/usr/lib/python3.6/site-packages/botocore/hooks.py", line 210, in _emit
response = handler(**kwargs)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 183, in __call__
if self._checker(attempts, response, caught_exception):
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 251, in __call__
caught_exception)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 277, in _should_retry
return self._checker(attempt_number, response, caught_exception)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 317, in __call__
caught_exception)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 223, in __call__
attempt_number, caught_exception)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 359, in _check_caught_exception
raise caught_exception
File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 222, in _get_response
proxies=self.proxies, timeout=self.timeout)
File "/usr/lib/python3.6/site-packages/botocore/vendored/requests/sessions.py", line 573, in send
r = adapter.send(request, **kwargs)
File "/usr/lib/python3.6/site-packages/botocore/vendored/requests/adapters.py", line 415, in send
raise ConnectionError(err, request=request)
botocore.vendored.requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
我目前只用少数几个 URL 运行它,但是一旦我让它工作,我需要用几千个(每个都有很多子目录)来做。
我不确定从哪里开始解决这个问题。我觉得很有可能有比我正在尝试的方法更好的方法。映射器似乎为每个 URL 花费了这么长时间的事实似乎表明我正在接近这个错误。我还应该提到,如果直接作为管道命令运行,mapper 和 reducer 都可以正常运行:
"cat short_url_list.txt | python mapper.py | sort | python reducer.py" -> 产生所需的输出,但在整个 URL 列表上运行需要很长时间。
任何指导将不胜感激。