2015-05-03 13 views
8

Tôi có một ứng dụng máy khách (Java EE & Android), thông tin liên lạc qua websockets. Các công trình truyền thông và cũng là giao thức để gửi các đối tượng như json, sẽ được bao bọc, tuần tự hóa, gửi đi, deserialized, unwrapped và xây dựng lại một cách chính xác. Cả hai ứng dụng đang sử dụng một dự án thư viện khác với tất cả các lớp yêu cầu và đáp ứng có thể có.yêu cầu đồng bộ hóa/phản hồi trong ngữ cảnh nio

Bây giờ vấn đề của tôi: Thư viện cũng nên triển khai chiến lược truyền thông không chặn, nhưng thực hiện yêu cầu phản hồi trong suốt. Có lẽ tôi không phải là người đầu tiên với vấn đề đó, vì vậy tôi nghĩ rằng có thể có một số triển khai tốt đẹp ra khỏi đó :).

Những gì tôi muốn:

// server should sleep 5000ms and then return 3*3 
Future<Integer> f1 = server.put(
    new SleepAndReturnSquareRequest(5000, 3), 
    new FutureCallback<Integer>{ 
    public void onSuccess(Integer square) { 
     runOnUiThread(new Runnable{ 
     // Android Toast show square 
     }); 
    } 

    // impl onFailure 
    } 
); 

Future<Date> f2 = server.put(
    new TimeRequest(), 
    new FutureCallback<Date>{ 
    public void onSuccess(Date date) { 
     // called before other onSuccess 
    } 

    // impl onFailure 
    } 
); 

// e.g. when the activity in android changes I'll cancel all futures. That means no more callbacks and (later) if possible client sends cancellations to the server for long requests. 

Mã nên gửi một SleepAndReturnRequest và sau đó một TimeRequest tất nhiên non-blocking. Yêu cầu đầu tiên mất 5 giây, yêu cầu thứ hai gần bằng không đến millis. Tôi hy vọng việc thực hiện để gọi lại cuộc gọi thứ hai ngay lập tức sau khi nhận được phản hồi, trong khi gọi lại đầu tiên được gọi sau khoảng 5 giây. Việc triển khai chịu trách nhiệm đối với phản hồi yêu cầu-phản hồi ở phía yêu cầu.

Những gì tôi đã cố gắng và nghĩ về:

của Google ổi listenable future sẽ là một cách tiếp cận tốt cho 'phản ứng phụ' Tôi nghĩ rằng, bởi vì nó chỉ là một nhiệm vụ chạy trên bất kỳ chủ đề, gửi kết quả trở lại cuối cùng. Điều đó sẽ dễ dàng hơn.

Đối với 'bên yêu cầu', tôi cần một số triển khai bổ sung số nhận dạng duy nhất cho thông báo để có thể khớp với phản hồi. Hy vọng rằng bạn có thể cho tôi biết một số gói làm công việc đó.

Cảm ơn bạn đã trợ giúp.

// chỉnh sửa:

Tôi nghĩ rằng câu hỏi của tôi đã bị hiểu lầm hoặc không đủ chính xác. Hãy nghĩ về cách triển khai GET hoặc POST qua websocket. Mỗi yêu cầu GET/POST có một phản hồi và sau đó kết nối được đóng lại. Các máy khách kết nối với một cổng cụ thể, máy chủ lấy một luồng từ một nhóm luồng, xử lý yêu cầu và đáp ứng. Kết hợp yêu cầu phản hồi mà tôi nghĩ là được thực hiện trong lớp truyền tải số 4.

Vì tôi muốn sử dụng ổ cắm web, tôi phải triển khai đối sánh đó trong lớp phần mềm 7.

Dưới đây là một số bước tôi đang thực hiện. K là loại khóa duy nhất, V là loại nội dung chung của một tin nhắn. Đó có thể là một chuỗi, dòng byte, bất cứ điều gì.

public class Synchronizer<K, V> implements UniqueMessageListener<K, V> { 

    private final ConcurrentMap<K, FutureCallback<V>> callbackMap = new ConcurrentHashMap<>(); 

    private final ListeningExecutorService executor; 

    private final UniqueMessageFactory<K, V> factory; 
    private final UniqueMessageSender<K, V> sender; 
    private UniqueMessageReceiver<K, V> receiver = null; 

    public Synchronizer(
      ListeningExecutorService executor, 
      UniqueMessageFactory<K, V> factory, 
      UniqueMessageSender<K, V> sender 
    ) { 
     this.executor = executor; 
     this.factory = factory; 
     this.sender = sender; 
    } 

    public void register(UniqueMessageReceiver<K, V> receiver) { 

     unregister(); 

     this.receiver = receiver; 
     receiver.addListener(this); 
    } 

    public void unregister() { 
     if(receiver != null) { 
      receiver.removeListener(this); 
      receiver = null; 
     } 
    } 

    public Future<V> put(Message<V> message, final FutureCallback<V> callback) { 
     final UniqueMessage<K, V> uniqueMessage = factory.create(message); 

     final Future<Boolean> sendFuture = sender.send(uniqueMessage); 

     final ListenableFuture<Boolean> listenableSendFuture = 
       JdkFutureAdapters.listenInPoolThread(sendFuture, executor); 


     listenableSendFuture.addListener(
       new Runnable() { 
        @Override 
        public void run() { 
         try { 
          if(listenableSendFuture.get() == true) { 
           callbackMap.put(
             uniqueMessage.getId(), 
             callback 
           ); 
          } else { 
           // maybe try it later again? 
          } 
         } catch(Exception e) { 
          // ... 
         } 
        } 
       }, 
       executor 
     ); 

     // implement cancel 
     return new SynchronizeFuture<>(
       listenableSendFuture, 
       callback 
     ); 
    } 

    @Override 
    public void onReceive(UniqueMessage<K, V> message) { 
     K id = message.getId(); 
     FutureCallback<V> callback; 

     callback = callbackMap.remove(id); 

     if(callback != null) { 
      callback.onSuccess(message.getContent()); 
     } 
    } 
} 

Có rất nhiều thứ để kiểm tra, nhưng tôi nghĩ nó sẽ hoạt động.

+0

Có cái gì đó trong râm hoặc Netty làm việc cho tôi? – Aitch

+0

Bạn cần một cái gì đó để tạo ra một transactionId ở phía khách hàng, đó là câu hỏi của bạn? –

+0

@Mr_Thorynque no.Đó là về cách thực hiện đồng bộ hóa các yêu cầu và phản hồi thông qua websockets. Sử dụng id giao dịch duy nhất khá rõ ràng. Đọc mã của tôi ở trên. – Aitch

Trả lời

-1

Bạn có thể thử ProtoBuf-RPC-Pro - nhưng nó không cho WebSockets mặc dù tôi nghĩ rằng đó là một dự án trên GitHub hỗ trợ nó :)

Các vấn đề liên quan