2014-07-16 13 views
8

Observable.Concat là một triển khai tham gia các quan sát nhưng chỉ IObservable<T> giây mới thực hiện đăng ký khi lần đầu tiên được hoàn thành.Concat nóng trong Rx

http://www.introtorx.com/content/v1.0.10621.0/12_CombiningSequences.html#Concat

Có thực hiện "HotConcat" không? Tương tự như Observable.Merge, nhưng vẫn giữ thứ tự phân phối, trước tiên hãy đẩy các thành phần của đăng ký ban đầu và sau đó là các phần tiếp theo. Một cái gì đó như: Hot Concat

Tôi biết rằng có thể sử dụng các ReplaySubject<T>, nhưng nó không có vẻ rất tốt, bởi vì hiệu suất và sử dụng bộ nhớ tác động ..

Trả lời

6

Đây là triển khai tôi đã sử dụng trong một thời gian. Việc triển khai này giới thiệu một nhà điều hành BufferUntilSubscribed, biến số IObservable thành một IConnectableObservable sẽ bắt đầu lưu vào bộ đệm bất cứ khi nào bạn gọi Connect và sẽ phân phối các kết quả đã lưu vào người đăng ký đầu tiên. Sau khi người đăng ký đầu tiên "bắt kịp", khi đó bộ đệm sẽ dừng và người đăng ký sẽ chỉ được cung cấp các sự kiện trực tiếp khi họ đến.

Khi bạn đã có, bạn có thể viết HotConcat như một cái gì đó như:

public static IObservable<T> HotConcat<T>(params IObservable<T>[] sources) 
{ 
    var s2 = sources.Select(s => s.BufferUntilSubscribed()); 
    var subscriptions = new CompositeDisposable(s2.Select(s2 => s2.Connect()).ToArray()); 
    return Observable.Create<T>(observer => 
    { 
     var s = new SingleAssignmentDisposable(); 
     var d = new CompositeDisposable(subscriptions); 
     d.Add(s); 

     s.Disposable = s2.Concat().Subscribe(observer); 

     return d; 
    }); 
} 

Đây là implemementation của BufferUntilSubscribed:

private class BufferUntilSubscribedObservable<T> : IConnectableObservable<T> 
{ 
    private readonly IObservable<T> _source; 
    private readonly IScheduler _scheduler; 
    private readonly Subject<T> _liveEvents; 
    private bool _observationsStarted; 
    private Queue<T> _buffer; 
    private readonly object _gate; 

    public BufferUntilSubscribedObservable(IObservable<T> source, IScheduler scheduler) 
    { 
     _source = source; 
     _scheduler = scheduler; 
     _liveEvents = new Subject<T>(); 
     _buffer = new Queue<T>(); 
     _gate = new object(); 
     _observationsStarted = false; 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     lock (_gate) 
     { 
      if (_observationsStarted) 
      { 
       return _liveEvents.Subscribe(observer); 
      } 

      _observationsStarted = true; 

      var bufferedEvents = GetBuffers().Concat().Finally(RemoveBuffer); // Finally clause to remove the buffer if the first observer stops listening. 
      return Observable.Merge(_liveEvents, bufferedEvents).Subscribe(observer); 
     } 
    } 

    public IDisposable Connect() 
    { 
     return _source.Subscribe(OnNext, _liveEvents.OnError, _liveEvents.OnCompleted); 
    } 

    private void RemoveBuffer() 
    { 
     lock (_gate) 
     { 
      _buffer = null; 
     } 
    } 

    /// <summary> 
    /// Acquires a lock and checks the buffer. If it is empty, then replaces it with null and returns null. Else replaces it with an empty buffer and returns the old buffer. 
    /// </summary> 
    /// <returns></returns> 
    private Queue<T> GetAndReplaceBuffer() 
    { 
     lock (_gate) 
     { 
      if (_buffer == null) 
      { 
       return null; 
      } 

      if (_buffer.Count == 0) 
      { 
       _buffer = null; 
       return null; 
      } 

      var result = _buffer; 
      _buffer = new Queue<T>(); 
      return result; 
     } 
    } 

    /// <summary> 
    /// An enumerable of buffers that will complete when a call to GetAndReplaceBuffer() returns a null, e.g. when the observer has caught up with the incoming source data. 
    /// </summary> 
    /// <returns></returns> 
    private IEnumerable<IObservable<T>> GetBuffers() 
    { 
     Queue<T> buffer; 
     while ((buffer = GetAndReplaceBuffer()) != null) 
     { 
      yield return buffer.ToObservable(_scheduler); 
     } 
    } 

    private void OnNext(T item) 
    { 
     lock (_gate) 
     { 
      if (_buffer != null) 
      { 
       _buffer.Enqueue(item); 
       return; 
      } 
     } 

     _liveEvents.OnNext(item); 
    } 
} 

/// <summary> 
/// Returns a connectable observable, that once connected, will start buffering data until the observer subscribes, at which time it will send all buffered data to the observer and then start sending new data. 
/// Thus the observer may subscribe late to a hot observable yet still see all of the data. Later observers will not see the buffered events. 
/// </summary> 
/// <param name="source"></param> 
/// <param name="scheduler">Scheduler to use to dump the buffered data to the observer.</param> 
/// <returns></returns> 
public static IConnectableObservable<T> BufferUntilSubscribed<T>(this IObservable<T> source, IScheduler scheduler) 
{ 
    return new BufferUntilSubscribedObservable<T>(source, scheduler); 
} 

/// <summary> 
/// Returns a connectable observable, that once connected, will start buffering data until the observer subscribes, at which time it will send all buffered data to the observer and then start sending new data. 
/// Thus the observer may subscribe late to a hot observable yet still see all of the data. Later observers will not see the buffered events. 
/// </summary> 
/// <param name="source"></param> 
/// <returns></returns> 
public static IConnectableObservable<T> BufferUntilSubscribed<T>(this IObservable<T> source) 
{ 
    return new BufferUntilSubscribedObservable<T>(source, Scheduler.Immediate); 
} 
+0

đẹp thực hiện, có vẻ là an toàn về vấn đề đồng thời .. @Brandon , một câu hỏi, tại sao bạn đang sử dụng 'Connect()' trong IObservables trước 'Observable.Create ' và không phải bên trong nó? –

+0

Trên lý thuyết rằng quan sát của bạn đã nóng và bạn muốn đệm chúng ngay cả trước khi bạn đăng ký. Di chuyển nó bên trong Tạo nếu bạn không cần điều đó. – Brandon

+0

Vâng, tất nhiên, nhưng chúng ta cần phải làm theo các hướng dẫn .. Một điều thú vị về nó, hầu hết các sơ đồ đá cẩm thạch mà tôi đọc về 'Observable.Concat' có ý tưởng sai (hoặc hình ảnh) về hoạt động (nếu quan sát nóng được sử dụng). –

1

Tôi không biết một phần như vậy nhưng bạn có thể viết một cái phù hợp với nhu cầu của bạn.

Đây là nỗ lực của tôi khi viết bài này. Nó sẽ giữ các phần tử trong bộ nhớ chỉ cho đến khi chúng được phát lại một lần. Nhưng tôi nghĩ rằng nên có một cách để thực hiện một thực hiện sạch hơn mặc dù.

public static IObservable<T> HotConcat<T>(this IObservable<T> first, IObservable<T> second) 
{ 
    return Observable.Create<T>(observer => 
    { 
     var queue = new Queue<Notification<T>>(); 

     var secondSubscription = second.Materialize().Subscribe(item => 
     { 
      if (queue == null) 
       return; 

      lock (queue) 
      { 
       queue.Enqueue(item); 
      } 
     }); 

     var secondReplay = Observable.Create<T>(secondObserver => 
     { 
      while (true) 
      { 
       Notification<T> item = null; 

       lock (queue) 
       { 
        if (queue.Count > 0) 
        { 
         item = queue.Dequeue(); 
        } 
        else 
        { 
         secondObserver.OnCompleted(); 
         secondSubscription.Dispose(); 
         queue = null; 
         break; 
        } 
       } 

       if (item != null) 
        item.Accept(secondObserver); 
      } 

      return secondSubscription; 
     }); 

     return first.Concat(secondReplay).Concat(second).Subscribe(observer); 
    }); 
}