trước tiên, hãy để tôi giải thích ngữ cảnh:Gửi nhiều yêu cầu asynchonous trên máy khách Netty
Tôi phải tạo một ứng dụng khách sẽ gửi nhiều yêu cầu HTTP để tải xuống hình ảnh. Các yêu cầu này phải không đồng bộ vì ngay sau khi hình ảnh được hoàn thành, yêu cầu sẽ được thêm vào hàng đợi và sau đó in ra màn hình. Bởi vì hình ảnh có thể lớn và phản hồi chunked, xử lý của tôi phải tổng hợp nó thành một bộ đệm.
Vì vậy, tôi thực hiện theo các mã ví dụ Netty (HTTP spoon example).
Hiện tại, tôi có ba Bản đồ tĩnh để lưu trữ cho mỗi kênh ID kênh và bộ đệm/chunk boolean/đối tượng cuối cùng của tôi.
private static final ConcurrentHashMap<Integer, ChannelBuffer> BUFFER_MAP = new ConcurrentHashMap<Integer, ChannelBuffer>();
private static final ConcurrentHashMap<Integer, ImagePack> PACK_MAP = new ConcurrentHashMap<Integer, ImagePack>();
private static final ConcurrentHashMap<Integer, Boolean> CHUNKS_MAP = new ConcurrentHashMap<Integer, Boolean>();
Sau đó, tôi tạo ứng dụng khách khởi động và truy cập countDown số lượng yêu cầu đang chờ xử lý. Hàng đợi cuối cùng và bộ đếm được chuyển tới Trình xử lý của tôi khi hình ảnh phản hồi hoàn tất.
final ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("connectTimeoutMillis", 30000);
final CountDownLatch latch = new CountDownLatch(downloadList.size()) {
@Override
public void countDown() {
super.countDown();
if (getCount() <= 0) {
try {
queue.put(END_OF_QUEUE);
bootstrap.releaseExternalResources();
} catch (InterruptedException ex) {
LOGGER.log(Level.WARNING, ex.getMessage(), ex);
}
}
}
};
bootstrap.getPipeline().addLast("codec", new HttpClientCodec());
bootstrap.getPipeline().addLast("handler", new TileClientHandler(queue, latch));
Sau đó, tôi tạo kênh cho mỗi hình ảnh để tải xuống và khi kênh được kết nối, yêu cầu sẽ được tạo và gửi. Máy chủ và cổng đã được trích xuất trước đó.
for (final ImagePack pack : downloadList) {
final ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture cf) throws Exception {
final Channel channel = future.getChannel();
PACK_MAP.put(channel.getId(), pack);
final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, pack.url);
request.setHeader(HttpHeaders.Names.HOST, host);
request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.BYTES);
if (channel.isWritable()) {
channel.write(request);
}
}
});
}
Bây giờ, đây là ChannelHandler của tôi là lớp bên trong mở rộng SimpleChannelUpstreamHandler
. Khi kênh được kết nối, một mục mới trong BUFFER_MAP
và trong CHUNKS_MAP
được tạo. BUFFER_MAP
chứa tất cả các bộ đệm hình ảnh được sử dụng bởi trình xử lý để tổng hợp các khối hình ảnh từ các kênh và CHUNKS_MAP
chứa boolean được xử lý phản hồi. Khi phản hồi hoàn tất, hình ảnh InputSteam
được thêm vào hàng đợi, chốt đếm ngược và kênh đã đóng.
private class TileClientHandler extends SimpleChannelUpstreamHandler {
private CancellableQueue<Object> queue;
private CountDownLatch latch;
public TileClientHandler(final CancellableQueue<Object> queue, final CountDownLatch latch) {
this.queue = queue;
this.latch = latch;
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
}
if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
CHUNKS_MAP.put(ctx.getChannel().getId(), false);
}
}
@Override
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
super.writeComplete(ctx, e);
if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
}
if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
CHUNKS_MAP.put(ctx.getChannel().getId(), false);
}
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
final Integer channelID = ctx.getChannel().getId();
if (!CHUNKS_MAP.get(channelID)) {
final HttpResponse response = (HttpResponse) e.getMessage();
if (response.isChunked()) {
CHUNKS_MAP.put(channelID, true);
} else {
final ChannelBuffer content = response.getContent();
if (content.readable()) {
final ChannelBuffer buf = BUFFER_MAP.get(channelID);
buf.writeBytes(content);
BUFFER_MAP.put(channelID, buf);
messageCompleted(e);
}
}
} else {
final HttpChunk chunk = (HttpChunk) e.getMessage();
if (chunk.isLast()) {
CHUNKS_MAP.put(channelID, false);
messageCompleted(e);
} else {
final ChannelBuffer buf = BUFFER_MAP.get(channelID);
buf.writeBytes(chunk.getContent());
BUFFER_MAP.put(channelID, buf);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
latch.countDown();
e.getChannel().close();
}
private void messageCompleted(MessageEvent e) {
final Integer channelID = e.getChannel().getId();
if (queue.isCancelled()) {
return;
}
try {
final ImagePack p = PACK_MAP.get(channelID);
final ChannelBuffer b = BUFFER_MAP.get(channelID);
p.setBuffer(new ByteArrayInputStream(b.array()));
queue.put(p.getTile());
} catch (Exception ex) {
LOGGER.log(Level.WARNING, ex.getMessage(), ex);
}
latch.countDown();
e.getChannel().close();
}
}
Vấn đề của tôi là, khi tôi thực thi mã này, tôi đã có những trường hợp ngoại lệ:
java.lang.IllegalArgumentException: invalid version format: 3!}@
at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
java.lang.IllegalArgumentException: invalid version format:
at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:546)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.channelDisconnected(ReplayingDecoder.java:449)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
at org.jboss.netty.channel.Channels.close(Channels.java:720)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
22 mars 2012 15:27:31 org.jboss.netty.channel.DefaultChannelPipeline
ATTENTION: An exception was thrown by a user handler while handling an exception event ([id: 0x3cd16610, /172.16.30.91:34315 :> tile.openstreetmap.org/193.63.75.98:80] EXCEPTION: java.lang.IllegalArgumentException: invalid version format:
java.lang.IllegalStateException: An Executor cannot be shut down from the thread acquired from itself. Please make sure you are not calling releaseExternalResources() from an I/O worker thread.
at org.jboss.netty.util.internal.ExecutorUtil.terminate(ExecutorUtil.java:71)
at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.releaseExternalResources(NioClientSocketChannelFactory.java:171)
at org.jboss.netty.bootstrap.Bootstrap.releaseExternalResources(Bootstrap.java:324)
at org.geotoolkit.client.map.CachedPyramidSet$1.countDown(CachedPyramidSet.java:314)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:514)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
at org.jboss.netty.channel.Channels.close(Channels.java:720)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Và cũng có một số NPE xuất hiện một số lần.
java.lang.NullPointerException
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.skipControlCharacters(HttpMessageDecoder.java:409)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:184)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
Tất cả mã này hoạt động tốt cho một yêu cầu nhưng một số nội dung lạ sẽ thêm vào bộ đệm khi có nhiều yêu cầu gửi.
Bất kỳ ý tưởng nào tôi thiếu ở đây? Cảm ơn.
Trong phiên bản đầu tiên, tôi sao chép bootstrap/xử lý cho mỗi hình ảnh được yêu cầu, nó hoạt động tốt nhưng không được tối ưu hóa.
Xin chào johnstlr, cảm ơn câu trả lời hữu ích nhanh này, bây giờ tôi sử dụng ChannelPipelineFactory để khởi tạo HTTPCodec và xử lý Tile của tôi. Nó hoạt động tốt, nhưng tôi vẫn còn có 'java.lang.IllegalStateException: Một Executor không thể được tắt từ thread được lấy từ chính nó. Vui lòng đảm bảo rằng bạn không gọi releaseExternalResources() từ một ngoại lệ thread.' của công nhân I/O. Bạn có ý tưởng về điều đó không? Và đối với thông tin, lý do mà tôi đã không sử dụng một HttpChunkAggregator là bạn phải thiết lập một kích thước bộ đệm để xây dựng HttpChunkAggregator. – qboileau
Bạn đang gọi bootstrap.releaseExternalResources từ bên trong CountDownLatch.countDown, được gọi từ một chuỗi IO trong các phương thức xử lý của bạn. Thật không may bạn không thể làm điều này. Bạn cần phải gọi releaseExternalResources từ một luồng không nằm trong nhóm luồng được Netty sử dụng. Một tùy chọn có thể là gọi releaseExternalResources trong chuỗi của bạn đang đọc từ hàng đợi nội bộ của bạn khi nó đã xử lý xong hàng đợi. Ngoài ra, bạn hoàn toàn đúng về HttpChunkAggregator. Lấy làm tiếc! – johnstlr