2012-03-16 35 views
8

Tôi mới bắt đầu tìm kiếm theo cách của mình. Tôi đang tìm cách tạo một máy khách http hoạt động không đồng bộ. Các ví dụ nhỏ của http chỉ hiển thị cách đợi các hoạt động IO và không phải cách sử dụng addListener và vì vậy tôi đã cố gắng tìm ra điều này trong vài ngày qua.Ứng dụng HTTP không đồng bộ với Netty

Tôi đang cố tạo lớp yêu cầu sẽ xử lý tất cả các trạng thái khác nhau của yêu cầu, từ kết nối, gửi dữ liệu, xử lý phản hồi và sau đó đóng kết nối. Để làm điều đó, lớp của tôi mở rộng SimpleChannelUpstreamHandler và triển khai ChannelFutureListener. Tôi sử dụng ChannelPipelineFactory để thêm (ví dụ) lớp này (dưới dạng SimpleChannelUpstreamHandler) vào đường ống dưới dạng trình xử lý.

Kết nối được tạo ra như thế này:

this.state = State.Connecting; 
this.clientBootstrap.connect(this.address).addListener(this); 

Sau đó operationComplete phương pháp:

@Override 
public void operationComplete(ChannelFuture future) throws Exception { 
    State oldState = this.state; 

    if (!future.isSuccess()) { 
     this.status = Status.Failed; 
     future.getChannel().disconnect().addListener(this); 
    } 
    else if (future.isCancelled()) { 
     this.status = Status.Canceled; 
     future.getChannel().disconnect().addListener(this); 
    } 
    else switch (this.state) { 
     case Connecting: 
      this.state = State.Sending; 
      Channel channel = future.getChannel(); 
      channel.write(this.createRequest()).addListener(this); 
      break; 

     case Sending: 
      this.state = State.Disconnecting; 
      future.getChannel().disconnect().addListener(this); 
      break; 

     case Disconnecting: 
      this.state = State.Closing; 
      future.getChannel().close().addListener(this); 
      break; 

     case Closing: 
      this.state = State.Finished; 
      break; 
    } 
    System.out.println("request operationComplete start state: " + oldState + ", end state: " + this.state + ", status: " + this.status); 
} 

private HttpRequest createRequest() { 
    String url = this.url.toString(); 

    HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); 
    request.setHeader(HttpHeaders.Names.HOST, this.url.getHost()); 
    request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); 
    request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); 

    return request; 
} 

Lớp cũng đè messageReceived phương pháp:

@Override 
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
    System.out.println("messageReceived"); 
    HttpResponse response = (HttpResponse) e.getMessage(); 

    ChannelBuffer content = response.getContent(); 
    if (content.readable()) { 
     System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8)); 
    } 
} 

Vấn đề là tôi nhận được kết quả này:

request operationComplete start state: Connecting, end state: Sending, status: Unknown 
request operationComplete start state: Sending, end state: Disconnecting, status: Unknown 
request operationComplete start state: Closing, end state: Finished, status: Unknown 
request operationComplete start state: Disconnecting, end state: Finished, status: Unknown 

Như bạn có thể thấy messageReceived của không được thực hiện đối với một số lý do, mặc dù các nhà máy đường ống dẫn thêm trường hợp của lớp này đến các đường ống dẫn.

Bất kỳ ý tưởng nào tôi thiếu ở đây? Cảm ơn.


Sửa

tôi quản lý để cuối cùng có được điều này nhờ làm việc để sự giúp đỡ của @JestanNirojan, trong trường hợp ai đó sẽ quan tâm đến các giải pháp:

public class ClientRequest extends SimpleChannelUpstreamHandler { 

    .... 

    public void connect() { 
     this.state = State.Connecting; 
     System.out.println(this.state); 
     this.clientBootstrap.connect(this.address); 
    } 

    @Override 
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
     this.state = State.Sending; 
     System.out.println(this.state); 
     ctx.getChannel().write(this.createRequest()); 
    } 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
     HttpResponse response = (HttpResponse) e.getMessage(); 

     ChannelBuffer content = response.getContent(); 
     if (content.readable()) { 
      System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8)); 
     } 

     this.state = State.Disconnecting; 
     System.out.println(this.state); 
    } 

    @Override 
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
     this.state = State.Closing; 
     System.out.println(this.state); 
    } 

    @Override 
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
     this.state = State.Finished; 
     System.out.println(this.state); 
    } 

    private HttpRequest createRequest() { 
     String url = this.url.toString(); 

     HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); 
     request.setHeader(HttpHeaders.Names.HOST, this.url.getHost()); 
     request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); 
     request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); 

     return request; 
    } 
} 
+0

là HttpResponse toàn bộ HttpResponse hoặc có thể là một đoạn không?Tôi có 1000 của khối trở lại và muốn có một sự kiện cho mỗi đoạn hoặc bộ nhớ sẽ phát nổ dẫn đến trong bộ nhớ. –

+0

HttpResponse là câu trả lời đầy đủ, bạn không thể chunk nó theo như tôi biết. Bạn nên đi thấp hơn sau đó, có thể với [HttpResponseDecoder] (http://static.netty.io/3.5/api/org/jboss/netty/handler/codec/http/HttpResponseDecoder.html). –

+0

Nếu bạn không xen kẽ trong chunking, hãy sử dụng máy khách http ánh sáng ở đây @ https://github.com/arungeorge81/netty-http-client –

Trả lời

3

Bạn đang sử dụng một ChannelFutureListener để thực hiện tất cả các hoạt động trong kênh (điều này là xấu) và người nghe trong tương lai sẽ được thực hiện ngay sau khi gọi các hoạt động kênh đó.

Sự cố là, Sau khi gửi tin nhắn, kênh sẽ bị ngắt kết nối ngay lập tức và trình xử lý không thể nhận được thông báo phản hồi đến sau.

 ........ 
    case Sending: 
     this.state = State.Disconnecting; 
     future.getChannel().disconnect().addListener(this); 
     break; 
     ........ 

bạn không được chặn luồng trong tương lai. Cách tiếp cận tốt nhất là mở rộng các phương thức

channelConnected(..) {} 
    messageReceived(..) {} 
    channelDisconnected(..) {} 

của SimpleChannelUpstreamHandler và phản ứng lại các sự kiện đó. bạn có thể giữ trạng thái trong trình xử lý đó.

+1

Oh. Điều đó thật đơn giản. Cảm ơn rất nhiều thông tin, tôi ước Netty có tài liệu tốt hơn về điều này. –

Các vấn đề liên quan