我正在使用 Apache Async Http 客户端从 Azure 存储下载大文件。
这是我正在使用的示例代码->
CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
httpclient.execute(request, new FutureCallback<org.apache.http.HttpResponse>() {
@Override
public void completed(final org.apache.http.HttpResponse httpResponse) {
}
@Override
public void failed(final Exception e) {
future.completeExceptionally(e);
}
@Override
public void cancelled() {
future.completeExceptionally(new Exception("Request cancelled"));
}
});
但这是在调用完成回调之前将文件存储在本地缓冲区中。
我尝试使用 AsyncByteConsumer ->
AsyncByteConsumer<org.apache.http.HttpResponse>
consumer = new AsyncByteConsumer<org.apache.http.HttpResponse>() {
@Override
protected void onByteReceived(ByteBuffer buf, IOControl ioctrl) throws IOException {
}
@Override
protected void onResponseReceived(org.apache.http.HttpResponse response) throws HttpException, IOException {
}
@Override
protected org.apache.http.HttpResponse buildResult(HttpContext context) throws Exception {
return null;
}
};
这对我也不起作用。我收到以下错误 -> java.lang.IllegalStateException: Content has not been provided
我想要的只是获取我将传递给我的客户的响应流,以便他们可以使用流直接下载文件。
编辑 1 ->
所以我扩展了 AbstractAsyncResponseConsumer 来编写我自己的消费者 ->
public abstract class MyConsumer extends AbstractAsyncResponseConsumer<HttpResponse> {
private volatile HttpResponse response;
private volatile SimpleInputBuffer buf;
public MyConsumer() {
super();
}
@Override
protected void onResponseReceived(HttpResponse response) {
this.response = response;
}
@Override
protected void onContentReceived(ContentDecoder decoder, IOControl ioctrl) throws IOException {
Asserts.notNull(this.buf, "Content buffer");
System.out.println("onContentReceived");
buf.consumeContent(decoder);
}
protected abstract void onEntitySet(HttpResponse httpResponse);
@Override
protected void onEntityEnclosed(HttpEntity entity, ContentType contentType) throws IOException {
System.out.println("onEntityEnclosed");
long len = entity.getContentLength();
if (len > Integer.MAX_VALUE) {
throw new ContentTooLongException("Entity content is too long: " + len);
}
if (len < 0) {
len = 4096;
}
this.buf = new SimpleInputBuffer((int) len, new HeapByteBufferAllocator());
this.response.setEntity(new ContentBufferEntity(entity, this.buf));
onEntitySet(this.response);
}
@Override
protected HttpResponse buildResult(HttpContext context) throws Exception {
System.out.println("buildResult");
return response;
}
@Override
protected void releaseResources() {
}
}
这是我用来执行http请求的代码
CompletableFuture<HttpResponse> makeRequest() {
HttpAsyncRequestProducer producer3 = HttpAsyncMethods.create(request);
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
httpclient.execute(producer3, new MyConsumer() {
@Override
protected void onEntitySet(HttpResponse httpResponse) {
future.complete(httpResponse);
}
},
new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse result) {
System.out.println("Completed" + result);
}
@Override
public void failed(Exception ex) {
}
@Override
public void cancelled() {
}
});
}
makeRequest().thenAccept((HttpResponse httpResponse) -> {
try {
System.out.println(IOUtils.toString(httpResponse.getEntity().getContent()));
} catch (IOException e) {
e.printStackTrace();
}
});
我得到这个输出->
onEntityEnclosed 。
java.io.IOException:基础输入流返回零字节。
一旦我收到返回响应的状态和标头的 onResponseReceived 回调,我就会完成响应的未来。
我认为应该发生的是 onContentReceived 回调将在一个单独的线程中调用,该线程会将缓冲区数据写入流,我的调用者线程可以在单独的线程中读取它。