2016-02-10 21 views
5

Tôi đang sử dụng một ứng dụng trò chơi nhiều người gọi là AppWarp (http://appwarp.shephertz.com), nơi bạn có thể thêm trình xử lý sự kiện được gọi lại khi sự kiện xảy ra, giả sử chúng ta sẽ nói về Trình nghe kết nối, nơi bạn cần triển khai giao diện này :Làm thế nào để chuyển đổi đúng cách Người nghe thành Phản ứng (Quan sát) bằng RxJava?

public interface ConnectionRequestListener { 
    void onConnectDone(ConnectEvent var1); 
    void onDisconnectDone(ConnectEvent var1); 
    void onInitUDPDone(byte var1); 
} 

mục tiêu của tôi ở đây là để chủ yếu là tạo ra một phiên bản phản ứng của khách hàng này sẽ được sử dụng trong Apps nội thay vì sử dụng client bản thân trực tiếp (tôi cũng sẽ dựa trên giao diện sau đó thay vì chỉ phụ thuộc vào WarpClient chính nó như trong ví dụ, nhưng đó không phải là điểm quan trọng, xin vui lòng đọc câu hỏi của tôi ở cuối cùng).

Vì vậy, những gì tôi đã làm là như sau:

1) tôi đã giới thiệu một sự kiện mới, đặt tên nó là RxConnectionEvent (Mà chủ yếu là nhóm Kết nối liên quan đến các sự kiện) như sau:

public class RxConnectionEvent { 
    // This is the original connection event from the source client 
    private final ConnectEvent connectEvent; 
    // this is to identify if it was Connection/Disconnection 
    private final int eventType; 

    public RxConnectionEvent(ConnectEvent connectEvent, int eventType) { 
     this.connectEvent = connectEvent; 
     this.eventType = eventType; 
    } 

    public ConnectEvent getConnectEvent() { 
     return connectEvent; 
    } 

    public int getEventType() { 
     return eventType; 
    } 
} 

2) Đã tạo một số loại sự kiện như sau:

public class RxEventType { 
    // Connection Events 
    public final static int CONNECTION_CONNECTED = 20; 
    public final static int CONNECTION_DISCONNECTED = 30; 
} 

3) Tạo các quan sát sau đó phát ra RxConnectionEvent mới của tôi

import com.shephertz.app42.gaming.multiplayer.client.WarpClient; 
import com.shephertz.app42.gaming.multiplayer.client.events.ConnectEvent; 
import rx.Observable; 
import rx.Subscriber; 
import rx.functions.Action0; 
import rx.subscriptions.Subscriptions; 

public class ConnectionObservable extends BaseObservable<RxConnectionEvent> { 

    private ConnectionRequestListener connectionListener; 

    // This is going to be called from my ReactiveWarpClient (Factory) Later. 
    public static Observable<RxConnectionEvent> createConnectionListener(WarpClient warpClient) { 
     return Observable.create(new ConnectionObservable(warpClient)); 
    } 

    private ConnectionObservable(WarpClient warpClient) { 
     super(warpClient); 
    } 

    @Override 
    public void call(final Subscriber<? super RxConnectionEvent> subscriber) { 
     subscriber.onStart(); 
     connectionListener = new ConnectionRequestListener() { 
      @Override 
      public void onConnectDone(ConnectEvent connectEvent) { 
       super.onConnectDone(connectEvent); 
       callback(new RxConnectionEvent(connectEvent, RxEventType.CONNECTION_CONNECTED)); 
      } 

      @Override 
      public void onDisconnectDone(ConnectEvent connectEvent) { 
       super.onDisconnectDone(connectEvent); 
       callback(new RxConnectionEvent(connectEvent, RxEventType.CONNECTION_DISCONNECTED)); 
      } 

      // not interested in this method (for now) 
      @Override 
      public void onInitUDPDone(byte var1) { } 

      private void callback(RxConnectionEvent rxConnectionEvent) 
      { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onNext(rxConnectionEvent); 
       } else { 
        warpClient.removeConnectionRequestListener(connectionListener); 
       } 
      } 
     }; 

     warpClient.addConnectionRequestListener(connectionListener); 
     subscriber.add(Subscriptions.create(new Action0() { 
      @Override 
      public void call() { 
       onUnsubscribed(warpClient); 
      } 
     })); 
    } 

    @Override 
    protected void onUnsubscribed(WarpClient warpClient) { 
     warpClient.removeConnectionRequestListener(connectionListener); 
    } 
} 

4) và cuối cùng BaseObservable của tôi trông giống như sau:

public abstract class BaseObservable<T> implements Observable.OnSubscribe<T> { 

    protected WarpClient warpClient; 

    protected BaseObservable (WarpClient warpClient) 
    { 
     this.warpClient = warpClient; 
    } 

    @Override 
    public abstract void call(Subscriber<? super T> subscriber); 

    protected abstract void onUnsubscribed(WarpClient warpClient); 
} 

Câu hỏi của tôi là chủ yếu: được thực hiện của tôi ở trên là đúng hoặc thay vào đó tôi nên tạo riêng biệt có thể quan sát được cho từng sự kiện, nhưng nếu vậy, khách hàng này có hơn 40-50 sự kiện tôi có phải tạo riêng biệt có thể quan sát được cho từng sự kiện không?

tôi cũng sử dụng đoạn mã trên như sau (sử dụng nó trong một "phi chính thức" thử nghiệm tích hợp đơn giản):

public void testConnectDisconnect() { 
    connectionSubscription = reactiveWarpClient.createOnConnectObservable(client) 
      .subscribe(new Action1<RxConnectionEvent>() { 
       @Override 
       public void call(RxConnectionEvent rxEvent) { 
        assertEquals(WarpResponseResultCode.SUCCESS, rxEvent.getConnectEvent().getResult()); 
        if (rxEvent.getEventType() == RxEventType.CONNECTION_CONNECTED) { 
         connectionStatus = connectionStatus | 0b0001; 
         client.disconnect(); 
        } else { 
         connectionStatus = connectionStatus | 0b0010; 
         connectionSubscription.unsubscribe(); 
         haltExecution = true; 
        } 
       } 
      }, new Action1<Throwable>() { 
       @Override 
       public void call(Throwable throwable) { 
        fail("Unexpected error: " + throwable.getMessage()); 
        haltExecution = true; 
       } 
      }); 

    client.connectWithUserName("test user"); 
    waitForSomeTime(); 
    assertEquals(0b0011, connectionStatus); 
    assertEquals(true, connectionSubscription.isUnsubscribed()); 
} 

Trả lời

2

tôi đề nghị bạn tránh kéo dài BaseObservable trực tiếp vì nó rất dễ bị lỗi. Thay vào đó, hãy thử sử dụng các công cụ Rx tự cung cấp cho bạn để tạo ra quan sát của bạn.

Giải pháp dễ nhất là sử dụng PublishSubject, vừa là Đài quan sát vừa là người đăng ký. Người nghe chỉ đơn giản là cần phải gọi chủ đề của onNext, và chủ đề sẽ phát ra sự kiện. Dưới đây là một ví dụ làm việc đơn giản:

public class PublishSubjectWarpperDemo { 

    public interface ConnectionRequestListener { 
     void onConnectDone(); 

     void onDisconnectDone(); 

     void onInitUDPDone(); 
    } 

    public static class RxConnectionEvent { 
     private int type; 

     public RxConnectionEvent(int type) { 
      this.type = type; 
     } 

     public int getType() { 
      return type; 
     } 

     public String toString() { 
      return "Event of Type " + type; 
     } 
    } 

    public static class SimpleCallbackWrapper { 
     private final PublishSubject<RxConnectionEvent> subject = PublishSubject.create(); 

     public ConnectionRequestListener getListener() { 
      return new ConnectionRequestListener() { 

       @Override 
       public void onConnectDone() { 
        subject.onNext(new RxConnectionEvent(1)); 
       } 

       @Override 
       public void onDisconnectDone() { 
        subject.onNext(new RxConnectionEvent(2)); 
       } 

       @Override 
       public void onInitUDPDone() { 
        subject.onNext(new RxConnectionEvent(3)); 
       } 
      }; 
     } 

     public Observable<RxConnectionEvent> getObservable() { 
      return subject; 
     } 

    } 

    public static void main(String[] args) throws IOException { 
     SimpleCallbackWrapper myWrapper = new SimpleCallbackWrapper(); 
     ConnectionRequestListener listner = myWrapper.getListener();// Get the listener and attach it to the game here. 
     myWrapper.getObservable().observeOn(Schedulers.newThread()).subscribe(event -> System.out.println(event)); 

     listner.onConnectDone(); // Call the listener a few times, the observable should print the event 
     listner.onDisconnectDone(); 
     listner.onInitUDPDone(); 

     System.in.read(); // Wait for enter 
    } 
} 

Một giải pháp phức tạp hơn sẽ được sử dụng một trong những onSubscribe triển khai để tạo ra một quan sát được sử dụng Observable.create(). Ví dụ: AsyncOnSubscibe. Giải pháp này có lợi ích của việc xử lý ngược lại đúng cách, vì vậy người đăng ký sự kiện của bạn không bị quá tải với các sự kiện. Nhưng trong trường hợp của bạn, điều đó nghe có vẻ như một kịch bản không chắc chắn, vì vậy sự phức tạp thêm có lẽ không đáng giá.

Các vấn đề liên quan