2015-03-04 17 views
21

Đây là hình ảnh về những gì tôi đang cố gắng hoàn thành.Chia Rx Quan sát được thành nhiều luồng và xử lý riêng lẻ

--abca - bbb - một sự chia rẽ

vào

--một ----- một ------- a -> một dòng

- --- b ------ bbb --- -> dòng b

------ c ---------- -> dòng c

Sau đó, , có thể

a.subscribe() 
b.subscribe() 
c.subscribe() 

Cho đến nay, mọi thứ tôi đã tìm thấy đã tách luồng bằng cách sử dụng một nhómBởi(), nhưng sau đó thu gọn mọi thứ lại thành một luồng và xử lý tất cả chúng trong cùng một hàm. Những gì tôi muốn làm là xử lý từng luồng có nguồn gốc theo một cách khác.

Cách tôi đang thực hiện ngay bây giờ là thực hiện một loạt các bộ lọc. Có cách nào tốt hơn để làm điều này?

Trả lời

7

Bạn không phải thu gọn Observables từ groupBy. Thay vào đó, bạn có thể đăng ký với họ.

Something như thế này:

String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"}; 

    Action1<String> a = s -> System.out.print("-a-"); 

    Action1<String> b = s -> System.out.print("-b-"); 

    Action1<String> c = s -> System.out.print("-c-"); 

    Observable 
      .from(inputs) 
      .groupBy(s -> s) 
      .subscribe((g) -> { 
       if ("a".equals(g.getKey())) { 
        g.subscribe(a); 
       } 

       if ("b".equals(g.getKey())) { 
        g.subscribe(b); 
       } 

       if ("c".equals(g.getKey())) { 
        g.subscribe(c); 
       } 
      }); 

Nếu báo cáo trông kinda xấu xí nhưng ít nhất bạn có thể xử lý từng dòng riêng biệt. Có thể có cách tránh chúng.

+0

Vâng, tôi muốn tránh những người đó nếu có thể. Tuy nhiên, nếu nó hoạt động, sau đó nó sẽ trông sạch hơn một chút kể từ khi tất cả ở một nơi, thay vì làm bộ lọc trên luồng ban đầu. Cảm ơn! –

+0

Làm việc như một sự quyến rũ! –

+0

Tuyệt! Tôi sẽ cập nhật câu trả lời của mình nếu tôi tìm ra cách loại bỏ các câu lệnh 'if'. – ihuk

31

dễ dàng như chiếc bánh, chỉ cần sử dụng filter

Một ví dụ trong scala

import rx.lang.scala.Observable 

val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a") 
val hotO: Observable[String] = o.share 
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a") 
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b") 
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c") 

aSource.subscribe(o ⇒ println("A: " + o), println,() ⇒ println("A Completed")) 

bSource.subscribe(o ⇒ println("B: " + o), println,() ⇒ println("B Completed")) 

cSource.subscribe(o ⇒ println("C: " + o), println,() ⇒ println("C Completed")) 

Bạn chỉ cần chắc chắn rằng nguồn quan sát được là nóng. Cách dễ nhất là share.

+2

Điều gì xảy ra nếu bạn muốn hoặc ban đầu có thể quan sát được cảm lạnh? –

+7

@double_squeeze chỉ sử dụng 'publish' thay vì' share' và gọi 'connect' khi tất cả các subsribers được đăng ký. –

+0

Được thăng hạng do nhận xét từ @Krzysztof Skyrzynecki – CrazyBS

Các vấn đề liên quan