2015-06-03 23 views

Trả lời

4

Chỉnh sửa: Vui lòng xem nhận xét của the_joric nếu bạn định sử dụng tính năng này. Có một trường hợp cạnh mà không được xử lý, tôi không thấy một cách nhanh chóng để sửa chữa nó, và vì vậy tôi không có thời gian để sửa chữa nó ngay bây giờ.

Đây là giải pháp trong C#, vì bạn có thẻ system.reactive.

static IObservable<int> MergeSorted(IObservable<int> a, IObservable<int> b) 
{ 
    var source = Observable.Merge(
     a.Select(x => Tuple.Create('a', x)), 
     b.Select(y => Tuple.Create('b', y))); 
    return source.Publish(o => 
    { 
     var published_a = o.Where(t => t.Item1 == 'a').Select(t => t.Item2); 
     var published_b = o.Where(t => t.Item1 == 'b').Select(t => t.Item2); 
     return Observable.Merge(
      published_a.Delay(x => published_b.FirstOrDefaultAsync(y => x <= y)), 
      published_b.Delay(y => published_a.FirstOrDefaultAsync(x => y <= x))); 
    }); 
} 

Ý tưởng được tóm tắt như sau.

  • Khi a phát ra giá trị x, chúng ta trì hoãn nó cho đến khi b phát ra một giá trị yx <= y.

  • Khi b phát ra giá trị y, chúng ta trì hoãn nó cho đến khi a phát ra một giá trị xy <= x.

Nếu bạn chỉ có quan sát nóng, bạn có thể làm như sau. Nhưng sau đây sẽ không hoạt động nếu có bất kỳ quan sát lạnh trong hỗn hợp. Tôi sẽ khuyên bạn nên luôn sử dụng phiên bản hoạt động cho cả các quan sát nóng và lạnh.

static IObservable<int> MergeSortedHot(IObservable<int> a, IObservable<int> b) 
{ 
    return Observable.Merge(
     a.Delay(x => b.FirstOrDefaultAsync(y => x <= y)), 
     b.Delay(y => a.FirstOrDefaultAsync(x => y <= x))); 
} 
+1

Nó dường như không hoạt động khi một trong những quan sát được tạo ra bằng cách sử dụng 'Observable.Create'. Tôi đã đặt ví dụ để gist: https://gist.github.com/skalinets/89f21662a619f685bd6a –

+1

@the_joric Có vẻ như nó không hoạt động khi một trong những quan sát kết thúc trước khi người khác thậm chí được đăng ký. Tôi không có thời gian để sửa lỗi này ngay bây giờ, tuy nhiên, vì vậy tôi sẽ chỉ đến nhận xét của bạn. –

+0

Không biết về tình trạng quá tải 'trễ 'này, cảm ơn! – ionoy

3

Bạn có thể kết hợp, sắp xếp và làm phẳng các trình tự, nhưng nó sẽ có một chi phí đáng kể:

o1.mergeWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...) 

hoặc

o1.concatWith(o2).toSortedList().flatMapIterable(v -> v).subscribe(...) 

Nếu không, bạn cần phải viết một nhà điều hành khá phức tạp .

Sửa 2015/04/06:

Here is một nhà điều hành mà điều này được sắp xếp hợp nhất hiệu quả hơn.

1

Điều này đã được thảo luận một thời gian trước đây trên RxJava mailing list, bạn sẽ tìm thấy một số liên kết đến các giải pháp có thể có trong chuỗi đó.

3

Tôi cũng đang tìm giải pháp sắp xếp hợp nhất hỗ trợ thao tác nén và không thể tìm thấy nó. Vì vậy, quyết định thực hiện nó một cách lỏng lẻo của tôi dựa trên nhà điều hành zip hiện có.

Tương tự như zip, các nhà điều hành hợp nhất được sắp xếp thu thập một mục từ mỗi nguồn quan sát đầu tiên, nhưng sau đó đặt chúng vào một hàng đợi ưu tiên, từ đó phát ra từng cái một theo trật tự tự nhiên của họ hay so sánh cụ thể.

Bạn có thể lấy nó từ GitHub là đã sẵn sàng để sử dụng thư viện hoặc chỉ cần sao chép/dán mã:

https://github.com/ybayk/rxjava-recipes

Xem xét nghiệm đơn vị để sử dụng.

+0

Trong readme của bạn nó nói "Bạn có thể hợp nhất sắp xếp các chuỗi sắp xếp rất lớn hoặc vô hạn". Tôi nghĩ đây là lợi thế chính so với các giải pháp khác. Có lẽ bạn nên làm nổi bật điều đó. – Luciano

0

Còn về việc hợp nhất và sắp xếp thì sao?

@Test 
public void testMergeChains() { 
    Observable.merge(Observable.from(Arrays.asList(1, 2, 13, 11, 5)), Observable.from(Arrays.asList(10, 4, 12, 3, 14, 15))) 
       .collect(ArrayList<Integer>::new, ArrayList::add) 
      .doOnNext(Collections::sort) 
      .subscribe(System.out::println); 

} 

Bạn có thể xem chi tiết ví dụ ở đây

https://github.com/politrons/reactive

+0

Giải pháp này hoạt động nhưng thật không may là không hiệu quả trong trường hợp một lượng lớn dữ liệu: nó yêu cầu tất cả các luồng được tiêu thụ và lưu trữ trong bộ nhớ vì sắp xếp(). – Christophe

Các vấn đề liên quan