2016-04-23 29 views
7

Tôi có vấn đề sau:RxJava chia sẻ lượng phát thải có thể quan sát giữa nhiều người đăng ký

Tôi có thể quan sát được công việc. Tôi đã cố gắng đăng ký nhiều lần trên cùng một quan sát, nhưng bên trong nhật ký tôi thấy rằng các quan sát ban đầu được bắt đầu nhiều lần.

thats thats quan sát của tôi tạo ra các đối tượng:

Observable.create((Observable.OnSubscribe<Api>) subscriber -> { 
      if (mApi == null) { 
       //do some work 
      } 
      subscriber.onNext(mApi); 
      subscriber.unsubscribe(); 
     }) 

thats quan sát của tôi rằng cần đối tượng

loadApi().flatMap(api -> api....())); 

Tôi đang sử dụng

.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread()) 
       .unsubscribeOn(Schedulers.io() 

trên tất cả các quan sát.

Trả lời

12

Tôi không chắc chắn rằng tôi đã hiểu câu hỏi của bạn một cách chính xác, nhưng tôi cho rằng bạn đang tìm cách chia sẻ lượng phát thải có thể quan sát được giữa nhiều người đăng ký. Có một số cách để làm điều này. Thứ nhất, bạn có thể sử dụng một Connectable Observable như vậy:

ConnectableObservable<Integer> obs = Observable.range(1,3).publish(); 
obs.subscribe(item -> System.out.println("Sub A: " + item)); 
obs.subscribe(item -> System.out.println("Sub B: " + item)); 
obs.connect(); //Now the source observable starts emitting items 

Output:

Sub A: 1 
Sub B: 1 
Sub A: 2 
Sub B: 2 
Sub A: 3 
Sub B: 3 

Ngoài ra, bạn có thể sử dụng một PublishSubject:

PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject 
subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject 
subject.subscribe(item -> System.out.println("Sub B: " + item));  
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable 

Output:

Sub A: 1 
Sub B: 1 
Sub A: 2 
Sub B: 2 
Sub A: 3 
Sub B: 3 

Cả hai o f các ví dụ này là một luồng, nhưng bạn có thể dễ dàng thêm các cuộc gọi quan sát hoặc subscirbeOn để làm cho chúng không đồng bộ.

+0

yes i muốn kết quả đầu tiên từ quan sát được trên tất cả các đăng ký – H3x0n

+0

gì tôi nên làm gì khi tôi không biết tần suất quan sát của tôi sẽ được đăng ký và khi người đăng ký nhận được kết quả trước khi tất cả được đăng ký – H3x0n

+0

được trả lời khá nhiều trong [câu hỏi này] (http://stackoverflow.com/questions/35951942/single-observable-with-multiple-subscribers) – JohnWowUs

1

Trước hết, sử dụng Observable.create rất khó và dễ bị sai. Bạn cần một cái gì đó giống như

Observable.create(subscriber -> { 
     if (mApi == null) { 
      //do some work 
     } 
     if (!subscriber.isUnsubscribed()) { 
      subscriber.onNext(mApi); 
      subscriber.onCompleted(); 
      // Not subscriber.unsubscribe(); 
     }    
}) 

Bạn có thể sử dụng

ConnectableObservable<Integer> obs = Observable.just(1).replay(1).autoConnect(); 

Tất cả các thuê bao tiếp theo sẽ nhận được các mục phát ra đơn

obs.subscribe(item -> System.out.println("Sub 1 " + item)); 
obs.subscribe(item -> System.out.println("Sub 2 " + item)); 
obs.subscribe(item -> System.out.println("Sub 3 " + item)); 
obs.subscribe(item -> System.out.println("Sub 4 " + item)); 
+0

tôi đã thử của bạn, nhưng trên đăng ký không nhận được gọi. Tôi có thể quan sát được là kết quả tĩnh công cộng. – H3x0n

+0

Bạn đã thử quan sát ban đầu với .publish(). Replay(). Autoconnect()? – JohnWowUs

+0

khi tôi chỉ sử dụng .share() nó hoạt động. – H3x0n

Các vấn đề liên quan