2

我正在使用 Apache Async Http 客户端从 Azure 存储下载大文件。

这是我正在使用的示例代码->

 CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
  httpclient.execute(request, new FutureCallback<org.apache.http.HttpResponse>() {
     @Override
     public void completed(final org.apache.http.HttpResponse httpResponse) {
     }

     @Override
     public void failed(final Exception e) {
        future.completeExceptionally(e);
     }

     @Override
     public void cancelled() {
        future.completeExceptionally(new Exception("Request cancelled"));
     }
  });

但这是在调用完成回调之前将文件存储在本地缓冲区中。

我尝试使用 AsyncByteConsumer ->

AsyncByteConsumer<org.apache.http.HttpResponse>
consumer = new AsyncByteConsumer<org.apache.http.HttpResponse>() {
     @Override
     protected void onByteReceived(ByteBuffer buf, IOControl ioctrl) throws IOException {

     }

     @Override
     protected void onResponseReceived(org.apache.http.HttpResponse response) throws HttpException, IOException {
     }

     @Override
     protected org.apache.http.HttpResponse buildResult(HttpContext context) throws Exception {
        return null;
     }
  };

这对我也不起作用。我收到以下错误 -> java.lang.IllegalStateException: Content has not been provided

我想要的只是获取我将传递给我的客户的响应流,以便他们可以使用流直接下载文件。

编辑 1 ->

所以我扩展了 AbstractAsyncResponseConsumer 来编写我自己的消费者 ->

public abstract  class MyConsumer extends AbstractAsyncResponseConsumer<HttpResponse> {

private volatile HttpResponse response;
private volatile SimpleInputBuffer buf;

public MyConsumer() {
    super();
}

@Override
protected void onResponseReceived(HttpResponse response) {
    this.response = response;
}

@Override
protected void onContentReceived(ContentDecoder decoder, IOControl ioctrl) throws IOException {
    Asserts.notNull(this.buf, "Content buffer");
    System.out.println("onContentReceived");
    buf.consumeContent(decoder);
}

protected abstract void onEntitySet(HttpResponse httpResponse);
@Override
protected void onEntityEnclosed(HttpEntity entity, ContentType contentType) throws IOException {
    System.out.println("onEntityEnclosed");
    long len = entity.getContentLength();
    if (len > Integer.MAX_VALUE) {
        throw new ContentTooLongException("Entity content is too long: " + len);
    }
    if (len < 0) {
        len = 4096;
    }
    this.buf = new SimpleInputBuffer((int) len, new HeapByteBufferAllocator());
    this.response.setEntity(new ContentBufferEntity(entity, this.buf));
    onEntitySet(this.response);


}

@Override
protected HttpResponse buildResult(HttpContext context) throws Exception {
    System.out.println("buildResult");
    return response;

}

@Override
protected void releaseResources() {
    } 
 }

这是我用来执行http请求的代码

CompletableFuture<HttpResponse> makeRequest() {
            HttpAsyncRequestProducer producer3 = HttpAsyncMethods.create(request);
            CompletableFuture<HttpResponse> future = new CompletableFuture<>();
            httpclient.execute(producer3, new MyConsumer() {
                                   @Override
                                   protected void onEntitySet(HttpResponse httpResponse) {
                                       future.complete(httpResponse);
                                   }
                               },
                               new FutureCallback<HttpResponse>() {
                @Override
                    public void completed(HttpResponse result) {
                        System.out.println("Completed" + result);
                    }

                    @Override
                    public void failed(Exception ex) {

                    }

                    @Override
                    public void cancelled() {

                    }
                });
}


     makeRequest().thenAccept((HttpResponse httpResponse) -> {
         try {
             System.out.println(IOUtils.toString(httpResponse.getEntity().getContent()));
         } catch (IOException e) {
             e.printStackTrace();
         }
     });

我得到这个输出->

onEntityEnclosed 。

java.io.IOException:基础输入流返回零字节。

一旦我收到返回响应的状态和标头的 onResponseReceived 回调,我就会完成响应的未来。

我认为应该发生的是 onContentReceived 回调将在一个单独的线程中调用,该线程会将缓冲区数据写入流,我的调用者线程可以在单独的线程中读取它。

4

1 回答 1

-1

我不确定您为什么会遇到这些问题,但以下代码片段对我有用(使用 HttpAsyncClient 4.1.3)

CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
httpClient.start();
final Future<Void> future = httpClient.execute(
        HttpAsyncMethods.createGet("http://httpbin.org/"),
        new AsyncByteConsumer<Void>() {

            @Override
            protected void onByteReceived(final ByteBuffer buf, final IOControl ioctrl) throws IOException {
                System.out.println(buf.remaining());
            }

            @Override
            protected void onResponseReceived(final HttpResponse response) throws HttpException, IOException {
                System.out.println(response.getStatusLine());
            }

            @Override
            protected Void buildResult(final HttpContext context) throws Exception {
                return null;
            }

        },
        null);
future.get();
httpClient.close();

控制台 >

HTTP/1.1 200 OK
1060
8192
3877
于 2018-05-03T12:33:13.583 回答