2015-09-24 13 views
5

Tôi tin rằng câu hỏi này không trùng lặp với Server sent event with Jersey: EventOutput is not closed after client drops, nhưng có thể liên quan đến Jersey Server-Sent Events - write to broken connection does not throw exception.Phát sóng bằng Jersey SSE: Phát hiện kết nối đã đóng

Trong chapter 15.4.2 của tài liệu Jersey, các SseBroadcaster được mô tả:

Tuy nhiên, SseBroadcaster nội bộ xác định và cũng xử lý ngắt kết nối khách hàng. Khi khách hàng đóng kết nối, đài phát hiện phát hiện điều này và loại bỏ kết nối cũ khỏi bộ sưu tập nội bộ của EventOutputs đã đăng ký cũng như giải phóng tất cả tài nguyên phía máy chủ được kết hợp với kết nối cũ.

Tôi không thể xác nhận điều này. Trong testcase sau, tôi thấy phương thức onClose() của lớp con chưa bao giờ được gọi: không phải khi nào EventInput đóng và không phải khi một tin nhắn khác được phát.

public class NotificationsResourceTest extends JerseyTest { 
    final static Logger log = LoggerFactory.getLogger(NotificationsResourceTest.class); 

    final static CountingSseBroadcaster broadcaster = new CountingSseBroadcaster(); 

    public static class CountingSseBroadcaster extends SseBroadcaster { 
     final AtomicInteger connectionCounter = new AtomicInteger(0); 

     public EventOutput createAndAttachEventOutput() { 
      EventOutput output = new EventOutput(); 
      if (add(output)) { 
       int cons = connectionCounter.incrementAndGet(); 
       log.debug("Active connection count: "+ cons); 
      } 
      return output; 
     } 

     @Override 
     public void onClose(final ChunkedOutput<OutboundEvent> output) { 
      int cons = connectionCounter.decrementAndGet(); 
      log.debug("A connection has been closed. Active connection count: "+ cons); 
     } 

     @Override 
     public void onException(final ChunkedOutput<OutboundEvent> chunkedOutput, final Exception exception) { 
      log.trace("An exception has been detected", exception); 
     } 

     public int getConnectionCount() { 
      return connectionCounter.get(); 
     } 
    } 

    @Path("notifications") 
    public static class NotificationsResource { 

     @GET 
     @Produces(SseFeature.SERVER_SENT_EVENTS) 
     public EventOutput subscribe() { 
      log.debug("New stream subscription"); 

      EventOutput eventOutput = broadcaster.createAndAttachEventOutput(); 
      return eventOutput; 
     } 
    } 

    @Override 
    protected Application configure() { 
     ResourceConfig config = new ResourceConfig(NotificationsResource.class); 
     config.register(SseFeature.class); 

     return config; 
    } 


    @Test 
    public void test() throws Exception { 
     // check that there are no connections 
     assertEquals(0, broadcaster.getConnectionCount()); 

     // connect subscriber 
     log.info("Connecting subscriber"); 
     EventInput eventInput = target("notifications").request().get(EventInput.class); 
     assertFalse(eventInput.isClosed()); 

     // now there are connections 
     assertEquals(1, broadcaster.getConnectionCount()); 

     // push data 
     log.info("Broadcasting data"); 
     String payload = UUID.randomUUID().toString(); 
     OutboundEvent chunk = new OutboundEvent.Builder() 
       .mediaType(MediaType.TEXT_PLAIN_TYPE) 
       .name("message") 
       .data(payload) 
       .build(); 
     broadcaster.broadcast(chunk); 

     // read data 
     log.info("Reading data"); 
     InboundEvent inboundEvent = eventInput.read(); 
     assertNotNull(inboundEvent); 
     assertEquals(payload, inboundEvent.readData()); 

     // close subscription 
     log.info("Closing subscription"); 
     eventInput.close(); 
     assertTrue(eventInput.isClosed()); 

     // at this point, the subscriber has disconnected itself, 
     // but jersey doesnt realise that 
     assertEquals(1, broadcaster.getConnectionCount()); 

     // wait, give TCP a chance to close the connection 
     log.debug("Sleeping for some time"); 
     Thread.sleep(10000); 

     // push data again, this should really flush out the not-connected client 
     log.info("Broadcasting data again"); 
     broadcaster.broadcast(chunk); 
     Thread.sleep(100); 

     // there is no subscriber anymore 
     assertEquals(0, broadcaster.getConnectionCount()); // FAILS! 
    } 
} 

Có thể JerseyTest không phải là cách hay để kiểm tra điều này. Trong một ... thiết lập lâm sàng ít hơn, nơi một JavaScript EventSource được sử dụng, tôi thấy onClose() được gọi, nhưng chỉ sau khi một tin nhắn được phát sóng trên kết nối đã đóng trước đó.

Tôi đang làm gì sai?

Tại sao không SseBroadcaster phát hiện việc đóng kết nối của khách hàng?

Follow-up

tôi đã tìm thấy JERSEY-2833 đó đã bị từ chối với trình như thiết kế:

Theo Tài liệu Jersey trong SSE chương (https://jersey.java.net/documentation/latest/sse.html) trong 15.4.1 nó đã đề cập rằng Jersey không đóng kết nối một cách rõ ràng, đó là trách nhiệm của phương thức tài nguyên hoặc ứng dụng khách.

Điều đó có nghĩa là gì? Tài nguyên có nên thực hiện hết thời gian chờ và hủy tất cả các kết nối đang hoạt động và đóng-bởi-khách hàng không?

Trả lời

0

tôi tin rằng nó có thể là tốt hơn để thiết lập một thời gian chờ trên tài nguyên của bạn và tiêu diệt chỉ kết nối đó, ví dụ:

@Path("notifications") 
public static class NotificationsResource { 

    @GET 
    @Produces(SseFeature.SERVER_SENT_EVENTS) 
    public EventOutput subscribe() { 
     log.debug("New stream subscription"); 

     EventOutput eventOutput = broadcaster.createAndAttachEventOutput(); 
     new Timer().schedule(new TimerTask() 
     { 
      @Override public void run() 
      { 
       eventOutput.close() 
      } 
     }, 10000); // 10 second timeout 
     return eventOutput; 
    } 
} 
+0

Cảm ơn cho đầu vào. Tôi chắc chắn rằng điều này sẽ làm việc, và nó có lợi ích của nó, nhưng nó không phải là những gì tôi sau. Cách giải quyết hiện tại của tôi là gửi nhận xét SSE mỗi x giây, điều này sẽ xóa bỏ một kết nối bị hỏng sau cùng. Câu hỏi ban đầu vẫn còn mặc dù - tại sao Jersey không phát hiện kết nối đã đóng và báo cáo là do chính nó? – Hank

0

Im tự hỏi nếu bởi subclassing bạn có thể thay đổi hành vi.

@Override 
    public void onClose(final ChunkedOutput<OutboundEvent> output) { 
     int cons = connectionCounter.decrementAndGet(); 
     log.debug("A connection has been closed. Active connection count: "+ cons); 
    } 

Trong này, bạn không đóng ChunkedOutput để nó không giải phóng kết nối. Đây có phải là vấn đề không?

+0

Không, điều đó không liên quan gì đến nó. Trong thực tế, đó là cách khác vòng, 'onClose()' được gọi là, khi 'OutboundEvent' được đóng bởi Broadcaster. – Hank

0

Trong tài liệu của các nhà xây dựng org.glassfish.jersey.media.sse.SseBroadcaster.SseBroadcaster(), nó nói:

Tạo một thể hiện mới. Nếu hàm tạo này được gọi bởi một lớp con, nó giả định lý do cho lớp con tồn tại là thực hiện các phương thức onClose(org.glassfish.jersey.server.ChunkedOutput)onException(org.glassfish.jersey.server.ChunkedOutput, Exception), vì vậy nó thêm đối tượng mới được tạo làm trình nghe.Để tránh điều này, các lớp con có thể gọi số SseBroadcaster(Class) chuyển lớp của chúng làm đối số.

Vì vậy, bạn không nên để lại constructor mặc định và cố gắng thực hiện constructor của bạn gọi siêu với lớp học của bạn:

public CountingSseBroadcaster(){ 
    super(CountingSseBroadcaster.class); 
} 
Các vấn đề liên quan