2015-06-22 25 views
5

Tôi có thể quan sát phát ra chuỗi và tôi muốn nhóm chúng theo ký tự đầu tiên. Thật dễ dàng để làm điều đó với groupBy như thế:RxJava: Cách nhóm số lượng lớn các mục tùy ý bằng chức năng ranh giới của riêng tôi

Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc"); 

Observable<List<String>> groupedRows = rows.groupBy(new Func1<String, Character>() { 
    public Character call(String row) { 
    return row.charAt(0); 
    } 
}).flatMap(new Func1<GroupedObservable<Character, String>, Observable<List<String>>>() { 
    public Observable<List<String>> call(GroupedObservable<Character, String> group) { 
    return group.toList(); 
    } 
}); 

groupedRows.toBlocking().forEach(new Action1<List<String>>() { 
    public void call(List<String> group) { 
    System.out.println(group); 
    } 
}); 

// Output: 
// [aa, ab, ac] 
// [bb, bc] 
// [cc] 

Nhưng nó không tốt cho mục đích của tôi, bởi vì groupBy chỉ hoàn thành từng nhóm khi nguồn phát ra quan sát được onComplete. Vì vậy, nếu tôi có nhiều hàng, chúng sẽ được thu thập hoàn toàn trong bộ nhớ và chỉ ở hàng cuối cùng "đỏ bừng" và được ghi vào đầu ra.

Tôi cần một cái gì đó như toán tử buffer, nhưng với chức năng của riêng tôi biểu thị ranh giới của mỗi nhóm. Tôi thực hiện nó như thế (biết rằng hàng luôn ra lệnh alphabeticaly):

Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc"); 

ConnectableObservable<String> connectableRows = rows.publish(); 

Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() { 
    private char lastChar = 0; 

    public Boolean call(String row) { 
    char currentChar = row.charAt(0); 
    boolean isNewGroup = lastChar != 0 && (currentChar != lastChar); 
    lastChar = currentChar; 
    return isNewGroup; 
    } 
}); 

Observable<List<String>> groupedRows = connectableRows.buffer(boundarySelector); 

connectableRows.connect(); 

groupedRows.toBlocking().forEach(new Action1<List<String>>() { 
    public void call(List<String> group) { 
    System.out.println(group); 
    } 
}); 

// Output: 
// [] 
// [] 
// [] 

Nó không làm việc, bởi vì boundarySelector được "ăn" các hàng, và tôi nghĩ rằng đó là lạ vì tôi đặc biệt sử dụng ConnectableObservable để biểu thị rằng Tôi cần hai người đăng ký (boundarySelectorgroupedRows) trước khi bắt đầu phát ra rows.

Một cách tò mò nếu tôi trì hoãn rows sau 1 giây, sau đó mã này hoạt động.

Vì vậy, câu hỏi đặt ra là: làm thế nào để tôi nhóm số lượng tùy ý các hàng bằng cách sử dụng chức năng ranh giới của riêng tôi?

+0

Tại sao bạn không tiêu thụ GroupedObservables reactively thay vì thu thập các giá trị của họ vào danh sách? – akarnokd

+0

Vì tôi cần tất cả các giá trị từ nhóm đơn lẻ để xây dựng một đối tượng duy nhất và đối tượng đơn lẻ này sẽ được đẩy xa hơn để xử lý phản ứng. Trường hợp sử dụng thực tế là tôi có mối quan hệ cha-con trong các thực thể DB, tôi thực hiện SQL tham gia các bảng đó, do đó một hàng cho mỗi đứa trẻ được sinh ra. Sau đó, tôi muốn nhóm các child từ cùng một parent trong một collection, xây dựng parent với childs của nó, và phát ra object cha này. –

+0

"Tôi cần tất cả các giá trị từ nhóm đơn lẻ để xây dựng một đối tượng duy nhất" - điều này nghe có vẻ mâu thuẫn với tôi. Nếu bạn cần tất cả các giá trị từ mỗi nhóm, tại sao bạn muốn cắt các nhóm thành các phần? Btw, bởi vì bạn kết nối trước khi đăng ký, toàn bộ điều chạy qua và bạn nhận được các giá trị rỗng. Bạn sẽ cần một số kết nối tự động nhưng tính năng như vậy hiện đang được xem xét trong RxJava. – akarnokd

Trả lời

1
Observable<Integer> source = Observable.range(0, 100); 

source 
.groupBy(k -> k/10) 
.publish(groups -> groups 
     .map(g -> Pair.of(g.getKey(), g.takeUntil(groups))) 
     .flatMap(kv -> 
      kv.second 
      .doOnNext(v -> System.out.println(kv.first + " value " + v)) 
      .doOnCompleted(() -> System.out.println(kv.first + " done")) 
     )) 
.subscribe() 
; 
0

Tìm thấy một cách để làm điều đó bằng buffer:

Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc"); 

ConnectableObservable<String> connectableRows = rows.publish(); 

Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() { 
    private char lastChar = 0; 

    public Boolean call(String row) { 
     char currentChar = row.charAt(0); 
     boolean isNewGroup = lastChar != 0 && (currentChar != lastChar); 
     lastChar = currentChar; 
     return isNewGroup; 
    } 
}); 

Observable<List<String>> groupedRows = connectableRows 
     .refCount() 
     .buffer(boundarySelector); 

groupedRows.toBlocking().forEach(new Action1<List<String>>() { 
    public void call(List<String> group) { 
     System.out.println(group); 
    } 
}); 

// Output: 
// [aa, ab, ac] 
// [bb, bc] 
// [cc] 
Các vấn đề liên quan