2

自大约 3 个月以来,我目前一直在努力解决这个问题。Crawler 似乎每 10 分钟获取一次页面,但在这之间似乎什么都不做。总体吞吐量非常慢。我正在并行抓取 300 个域。这应该使大约 30 页/秒,爬行延迟 10 秒。目前它大约是每秒 2 页。

拓扑在 PC 上运行

  • 8GB内存
  • 普通硬盘
  • 酷睿处理器
  • Ubuntu 16.04

Elasticsearch 安装在具有相同规格的另一台机器上。

在这里你可以看到来自 Grafana Dashboard 的指标

Grafana 仪表板

它们还反映在 Storm UI 中看到的进程延迟中:

风暴用户界面

我目前的 Stormcrawler 架构是:

spouts:
  - id: "spout"
    className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.AggregationSpout"
    parallelism: 25

bolts:
  - id: "partitioner"
    className: "com.digitalpebble.stormcrawler.bolt.URLPartitionerBolt"
    parallelism: 1
  - id: "fetcher"
    className: "com.digitalpebble.stormcrawler.bolt.FetcherBolt"
    parallelism: 6
  - id: "sitemap"
    className: "com.digitalpebble.stormcrawler.bolt.SiteMapParserBolt"
    parallelism: 1
  - id: "parse"
    className: "com.digitalpebble.stormcrawler.bolt.JSoupParserBolt"
    parallelism: 1
  - id: "index"
    className: "de.hpi.bpStormcrawler.BPIndexerBolt"
    parallelism: 1
  - id: "status"
    className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.StatusUpdaterBolt"
    parallelism: 4
  - id: "status_metrics"
    className: "com.digitalpebble.stormcrawler.elasticsearch.metrics.StatusMetricsBolt"
    parallelism: 1

使用配置(这里是最相关的部分):

config:
  topology.workers: 1
  topology.message.timeout.secs: 300
  topology.max.spout.pending: 100
  topology.debug: false

  fetcher.threads.number: 50

  worker.heap.memory.mb: 2049
  partition.url.mode: byDomain

  fetcher.server.delay: 10.0

这里是风暴配置(也只是相关部分):

nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"

ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"

supervisor.childopts: "-Djava.net.preferIPv4Stack=true"

worker.childopts: "-Xmx1500m -Djava.net.preferIPv4Stack=true"

你知道可能是什么问题吗?还是只是硬件的问题?

我已经尝试过的

  • 将 fetcher.server.delay 增加到一个更高和更低的值,这并没有改变任何东西
  • 减少和增加 fetcher 线程的数量
  • 玩弄并行性
  • 计算是否为网络带宽。带宽为 400mbit/s,平均页面大小为 0.5 MB,它将是 15MB/s,这将是 120mbit/s,这也不应该是问题
  • 增加工人数量

你还有什么我应该检查的想法或可以解释缓慢获取的原因吗?也许它也只是缓慢的硬件?或者瓶颈是 Elasticsearch?

非常感谢您提前

编辑:

我将拓扑更改为两个工作人员并且经常出现错误

2018-07-03 17:18:46.326 c.d.s.e.p.AggregationSpout Thread-33-spout-executor[26 26] [INFO] [spout #12]  Populating buffer with nextFetchDate <= 2018-06-21T17:52:42+02:00
2018-07-03 17:18:46.327 c.d.s.e.p.AggregationSpout I/O dispatcher 26 [ERROR] Exception with ES query
java.io.IOException: Unable to parse response body for Response{requestLine=POST /status/status/_search?typed_keys=true&ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true&preference=_shards%3A12&search_type=query_then_fetch&batched_reduce_size=512 HTTP/1.1, host=http://ts5565.byod.hpi.de:9200, response=HTTP/1.1 200 OK}
	at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:548) [stormjar.jar:?]
	at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:600) [stormjar.jar:?]
	at org.elasticsearch.client.RestClient$1.completed(RestClient.java:355) [stormjar.jar:?]
	at org.elasticsearch.client.RestClient$1.completed(RestClient.java:346) [stormjar.jar:?]
	at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) [stormjar.jar:?]
	at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177) [stormjar.jar:?]
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) [stormjar.jar:?]
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) [stormjar.jar:?]
	at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) [stormjar.jar:?]
	at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) [stormjar.jar:?]
	at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) [stormjar.jar:?]
	at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) [stormjar.jar:?]
	at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) [stormjar.jar:?]
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) [stormjar.jar:?]
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) [stormjar.jar:?]
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) [stormjar.jar:?]
	at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) [stormjar.jar:?]
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) [stormjar.jar:?]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.NullPointerException

抓取过程仍然看起来更加平衡,但仍然没有获取很多链接

在此处输入图像描述

同样在运行拓扑几个星期后,延迟上升了很多

在此处输入图像描述

4

2 回答 2

2

当您不使用ack()所有(自定义)螺栓fail()中的所有元组时,就会发生这种情况。此外,当从中间螺栓发射时,请确保将新元组正确锚定到前一个元组。有关解释,请参阅此 SO 答案

我有同样的问题,也有一个 ES 设置。我最初认为这是关于nextFetchDateES Spout 的逻辑,但在禁用我的自定义螺栓后,问题得到了解决。所以我发现我错过ack()了某个地方的元组。这导致未确认的元组在拓扑中丢失,然后将在 10 分钟/600 秒后重新发出。

虽然我没有找到定义 10 分钟重播超时的位置,因为风暴重播机制通过以下方式设置为 5 分钟 topology.message.timeout.secs: 300

于 2018-07-04T13:01:52.637 回答
2

抱歉回复晚了,刚放假回来。

从图表来看,worker 被重新启动,这让我认为某些东西正在阻塞或崩溃拓扑。在没有任何反应的一段时间后,工作人员重新启动,它处理了一些 URL,问题再次发生。

您是否检查了日志中的错误消息?日志中是否有内存转储?你能隔离导致问题的 URL 吗?

于 2018-07-02T09:32:49.170 回答