我正在使用 Apache HTTPClient 4 以默认级别访问连接到 twitter 的流 api。它在开始时运行良好,但在检索数据几分钟后,它会出现以下错误:
2012-03-28 16:17:00,040 DEBUG org.apache.http.impl.conn.SingleClientConnManager: Get connection for route HttpRoute[{tls}->http://myproxy:80->https://stream.twitter.com:443]
2012-03-28 16:17:00,040 WARN com.cloudera.flume.core.connector.DirectDriver: Exception in source: TestTwitterSource
java.lang.IllegalStateException: Invalid use of SingleClientConnManager: connection still allocated.
at org.apache.http.impl.conn.SingleClientConnManager.getConnection(SingleClientConnManager.java:216)
Make sure to release the connection before allocating another one.
at org.apache.http.impl.conn.SingleClientConnManager$1.getConnection(SingleClientConnManager.java:190)
我明白为什么我会面临这个问题。我正在尝试在水槽集群中使用这个 HttpClient 作为水槽源。代码如下所示:
public Event next() throws IOException, InterruptedException {
try {
HttpHost target = new HttpHost("stream.twitter.com", 443, "https");
new BasicHttpContext();
HttpPost httpPost = new HttpPost("/1/statuses/filter.json");
StringEntity postEntity = new StringEntity("track=birthday",
"UTF-8");
postEntity.setContentType("application/x-www-form-urlencoded");
httpPost.setEntity(postEntity);
HttpResponse response = httpClient.execute(target, httpPost,
new BasicHttpContext());
BufferedReader reader = new BufferedReader(new InputStreamReader(
response.getEntity().getContent()));
String line = null;
StringBuffer buffer = new StringBuffer();
while ((line = reader.readLine()) != null) {
buffer.append(line);
if(buffer.length()>30000) break;
}
return new EventImpl(buffer.toString().getBytes());
} catch (IOException ie) {
throw ie;
}
}
我正在尝试将响应流中的 30,000 个字符缓冲到 StringBuffer,然后将其作为接收到的数据返回。我显然没有关闭连接 - 但我想我还不想关闭它。Twitter 的开发指南在这里谈到了这一点,内容如下:
一些 HTTP 客户端库仅在服务器关闭连接后才返回响应正文。这些客户端将无法访问 Streaming API。您必须使用将增量返回响应数据的 HTTP 客户端。大多数健壮的 HTTP 客户端库将提供此功能。例如,Apache HttpClient 将处理这个用例。
它清楚地告诉您 HttpClient 将递增地返回响应数据。我已经浏览了示例和教程,但我还没有找到任何可以做到这一点的东西。如果你们使用过 httpclient(如果不是 apache)并逐渐阅读 twitter 的流式传输 API,请告诉我您是如何实现这一壮举的。那些还没有的人,请随时为答案做出贡献。TIA。
更新
我尝试这样做:1)我将获取流句柄移至水槽源的 open 方法。2)使用简单的输入流并将数据读入字节缓冲区。所以这里是方法体现在的样子:
byte[] buffer = new byte[30000];
while (true) {
int count = instream.read(buffer);
if (count == -1)
continue;
else
break;
}
return new EventImpl(buffer);
这在一定程度上有效 - 我收到推文,它们很好地被写入目的地。问题在于 instream.read(buffer) 返回值。即使流上没有数据,并且缓冲区有默认的 \u0000 字节和 30,000 个字节,所以这个值被写入目标。所以目标文件看起来像这样..“ tweets..tweets..tweeets.. \u0000\u0000\u0000\u0000\u0000\u0000\u0000...tweets..tweets...”。我知道计数不会返回 -1 因为这是一个永无止境的流,所以我如何确定缓冲区是否有来自读取命令的新内容?