2015-04-10 15 views
10

Có thể sử dụng ReactiveExtensions để đạt được những điều sau đây hay không;Hợp nhất hai Quan sát với một ưu tiên cao hơn

  • Hai quan sát, một trong đó là ưu tiên "cao" và người kia "Thấp"

  • Kết hợp cả hai quan sát thành một, mà sau đó có thể được đăng ký, với ý định rằng đây quả Quan sát sẽ luôn phát ra các mục ưu tiên cao trước bất kỳ mục ưu tiên thấp nào.

Tôi hiểu rằng điều này có thể được triển khai nhỏ hơn bằng cách sử dụng hai bộ sưu tập ConcurrentQueue và một cái gì đó như thế này;

return this.highPriorityItems.TryDequeue(out item) 
    || this.lowPriorityItems.TryDequeue(out item); 

Nhưng phương pháp này có vấn đề như không bị "subscribable" theo cách tương tự mà một Quan sát sẽ là (vì vậy một khi hàng đợi đang cạn kiệt, chế biến sẽ kết thúc mà không có nhiều câu nói đùa thêm để đẩy này ra thành một Bài tập).

Hơn nữa, tôi muốn quan tâm đến việc áp dụng một số bộ lọc bổ sung trên hàng đợi, như điều chỉnh và "riêng biệt cho đến khi thay đổi", vì vậy Rx có vẻ như một sự phù hợp tự nhiên ở đây.

+0

Vì vậy, bàn tay quan sát "thấp" (bất kể đoạn mã mới này) là một giá trị. Mã này có ý nghĩa gì? Chờ mãi để xem liệu "cao" có thể quan sát được * bao giờ * tạo ra một giá trị khác không? Tôi đang đấu tranh để xem các ưu tiên có thể hoạt động như thế nào và chỉ tiếp tục quay lại sử dụng 'Hợp nhất' và bỏ qua các ưu tiên. –

+0

@Damien_The_Unbeliever, ý tưởng về cơ bản là "Hợp nhất" nhưng với các mục ưu tiên cao bỏ qua phía trước. Điều đó có ý nghĩa? Tôi muốn xử lý cả hai nguồn mục trong cùng một cách, do đó Hợp nhất và đăng ký không biết về mức độ ưu tiên. Nhưng đó là thứ tự mà họ đến nơi mà tôi đang tập trung vào - hãy tưởng tượng tôi có 20 mục ưu tiên thấp sắp tới, và một mục ưu tiên cao mới được gửi đi. Tôi muốn điều đó được phát ra tiếp theo, không phải sau 20 mục khác. –

+0

Nhưng bạn chỉ từng thấy một mục tại một thời điểm. Nếu ưu tiên "thấp" đã vượt qua bạn năm mục cho đến thời điểm này và * dự định * sẽ gửi cho bạn thêm 15 mục nữa, thì sao? Và bạn chắc chắn không có cách nào phát hiện ra thực tế đó hoàn toàn từ các quan sát được. –

Trả lời

4

Điều bạn mô tả tất nhiên là hàng đợi ưu tiên.

Rx là tất cả về luồng sự kiện, thay vì hàng đợi. Tất nhiên, hàng đợi được sử dụng nhiều trong Rx - nhưng chúng không phải là khái niệm hạng nhất, phần chi tiết hơn về chi tiết triển khai của các khái niệm của Rx.

Một ví dụ tốt về nơi chúng tôi cần hàng đợi là đối phó với người quan sát chậm. Các sự kiện được gửi đi tuần tự trong Rx và nếu các sự kiện đến nhanh hơn người quan sát có thể xử lý chúng, thì chúng phải được xếp hàng đợi với người quan sát đó. Nếu có nhiều người quan sát, thì nhiều hàng đợi logic phải được duy trì, vì các nhà quan sát có thể tiến triển ở các bước khác nhau - và Rx chọn không giữ chúng trong bước khóa.

"Áp suất ngược" là khái niệm về các nhà quan sát cung cấp phản hồi cho các quan sát để cho phép các cơ chế xử lý áp suất của một quan sát nhanh hơn - chẳng hạn như giảm phát hoặc điều chỉnh. Rx không có cách thức đầu tiên để giới thiệu áp suất ngược - chỉ được xây dựng trong có nghĩa là một quan sát viên quan sát có thể quan sát được thông qua bản chất đồng bộ của OnNext. Bất kỳ cơ chế nào khác sẽ cần phải nằm ngoài băng tần. Câu hỏi của bạn liên quan trực tiếp đến áp lực ngược, vì nó chỉ có liên quan trong trường hợp người quan sát chậm.

Tôi đề cập đến tất cả những điều này để cung cấp bằng chứng cho tuyên bố của tôi rằng Rx không phải là lựa chọn tuyệt vời để cung cấp loại công văn ưu tiên mà bạn đang tìm kiếm - thực sự, cơ chế xếp hàng đầu tiên có vẻ phù hợp hơn.

Để giải quyết vấn đề trong tầm tay, bạn cần tự quản lý xếp hàng ưu tiên, trong toán tử tùy chỉnh. Để giải quyết vấn đề: những gì bạn đang nói là nếu các sự kiện đến trong quá trình xử lý sự kiện OnNext, chẳng hạn như có sự kiện để gửi, thì thay vì hàng đợi FIFO điển hình mà Rx sử dụng, bạn muốn công văn dựa trên một số ưu tiên.

Điều cần lưu ý là tinh thần Rx không giữ nhiều người quan sát trong bước khóa, các nhà quan sát đồng thời sẽ có khả năng thấy các sự kiện theo một thứ tự khác, có thể hoặc không có vấn đề gì đối với bạn. Bạn có thể sử dụng một cơ chế như Publish để có được sự nhất quán trật tự - nhưng có thể bạn không muốn làm điều này vì thời gian phân phối sự kiện sẽ nhận được khá khó đoán và không hiệu quả trong trường hợp đó.

tôi chắc chắn rằng có những cách tốt hơn để làm điều này, nhưng đây là một ví dụ về một giao ưu tiên hàng đợi dựa - bạn có thể mở rộng này để làm việc cho nhiều con suối và những ưu tiên (ưu tiên hoặc thậm chí mỗi sự kiện) sử dụng một thực hiện hàng đợi tốt hơn (chẳng hạn như hàng đợi ưu tiên dựa trên b-tree) nhưng tôi đã chọn để duy trì điều này khá đơn giản. Thậm chí sau đó, lưu ý số lượng đáng kể các mối quan tâm mà mã phải giải quyết, xung quanh việc xử lý lỗi, hoàn thành, v.v. - và tôi đã lựa chọn khi chúng được báo hiệu rằng chắc chắn có nhiều lựa chọn hợp lệ khác.

Tất cả trong tất cả, việc triển khai này chắc chắn đặt tôi khỏi ý tưởng sử dụng Rx cho điều này. Đó là đủ phức tạp mà có lẽ có lỗi ở đây anyway. Như tôi đã nói, có thể là mã gọn gàng cho việc này (đặc biệt là cho các nỗ lực tối thiểu Tôi đã đặt vào nó!), Nhưng khái niệm, tôi thấy không thoải mái với ý tưởng không phụ thuộc vào thực hiện:

public static class ObservableExtensions 
{ 
    public static IObservable<TSource> MergeWithLowPriorityStream<TSource>(
     this IObservable<TSource> source, 
     IObservable<TSource> lowPriority, 
     IScheduler scheduler = null) 
    {  
     scheduler = scheduler ?? Scheduler.Default; 
     return Observable.Create<TSource>(o => {  
      // BufferBlock from TPL dataflow is used as it is 
      // handily awaitable. package: Microsoft.Tpl.Dataflow   
      var loQueue = new BufferBlock<TSource>(); 
      var hiQueue = new BufferBlock<TSource>(); 
      var errorQueue = new BufferBlock<Exception>(); 
      var done = new TaskCompletionSource<int>(); 
      int doneCount = 0; 
      Action incDone =() => { 
       var dc = Interlocked.Increment(ref doneCount); 
       if(dc == 2) 
        done.SetResult(0); 
      }; 
      source.Subscribe(
       x => hiQueue.Post(x), 
       e => errorQueue.Post(e), 
       incDone); 
      lowPriority.Subscribe(
       x => loQueue.Post(x), 
       e => errorQueue.Post(e), 
       incDone); 
      return scheduler.ScheduleAsync(async(ctrl, ct) => { 
       while(!ct.IsCancellationRequested) 
       { 
        TSource nextItem; 
        if(hiQueue.TryReceive(out nextItem) 
         || loQueue.TryReceive(out nextItem)) 
         o.OnNext(nextItem); 

        else if(done.Task.IsCompleted) 
        { 
         o.OnCompleted(); 
         return; 
        } 

        Exception error;       
        if(errorQueue.TryReceive(out error)) 
        { 
         o.OnError(error); 
         return; 
        } 

        var hiAvailableAsync = hiQueue.OutputAvailableAsync(ct);  
        var loAvailableAsync = loQueue.OutputAvailableAsync(ct);      
        var errAvailableAsync = 
         errorQueue.OutputAvailableAsync(ct); 
        await Task.WhenAny(
         hiAvailableAsync, 
         loAvailableAsync, 
         errAvailableAsync, 
         done.Task); 
       } 
      }); 
     }); 
    } 
} 

Và ví dụ về cách sử dụng:

void static Main() 
{ 
    var xs = Observable.Range(0, 3); 
    var ys = Observable.Range(10, 3); 

    var source = ys.MergeWithLowPriorityStream(xs); 

    source.Subscribe(Console.WriteLine,() => Console.WriteLine("Done")); 
} 

Trước tiên, điều này sẽ cho biết mức ưu tiên cao hơn của chúng.

+0

Cảm ơn bạn James, lời giải thích của bạn đã mở mắt của tôi nhiều hơn một chút với mục đích thực tế và khả năng của Rx. Bạn đang phải tất nhiên, giải pháp Rx đang cố gắng để phù hợp với một hình vuông peg trong một lỗ tròn. Tôi đột nhiên cảm thấy bắt buộc phải đọc IntroToRx từ đầu đến cuối! Cảm ơn một lần nữa vì đã dành thời gian để cung cấp câu trả lời này, nó được đánh giá cao. –

1

Bạn cần tính thời gian để giải quyết vấn đề như thế này. Trong nhận xét ở trên, bạn nói về thông báo của người dùng. Dường như với tôi rằng những gì bạn muốn là một cách để nói là một cái gì đó như thế này: Hiển thị thông báo gần đây nhất, trừ khi có một thông báo ưu tiên cao, trong trường hợp đó hiển thị đó.

Sơ đồ bong bóng sẽ giúp dễ dàng hơn để giải thích về điều này. Một ký tự là một giây:

High : ---------3---5-6 
Low : 1--2-------4---- 
Result: 1--2-----3---5-6 

Đó có phải là những gì bạn nghĩ không? Bạn có muốn để đệm tin nhắn và hiển thị chúng sau này? Giống như trong trường hợp này, có ổn không khi thông báo 5 chỉ hiển thị trong 2 giây?

Các vấn đề liên quan