2011-09-29 32 views
21

Tôi đang sử dụng phần mở rộng phản ứng để đối chiếu dữ liệu vào bộ đệm của 100ms:Tiện ích mở rộng phản ứng có hỗ trợ bộ đệm lăn không?

this.subscription = this.dataService 
    .Where(x => !string.Equals("FOO", x.Key.Source)) 
    .Buffer(TimeSpan.FromMilliseconds(100)) 
    .ObserveOn(this.dispatcherService) 
    .Where(x => x.Count != 0) 
    .Subscribe(this.OnBufferReceived); 

này hoạt động tốt. Tuy nhiên, tôi muốn có hành vi hơi khác so với hành vi của hoạt động Buffer. Về cơ bản, tôi muốn đặt lại bộ hẹn giờ nếu nhận được một mục dữ liệu khác. Chỉ khi không có dữ liệu nào được nhận cho toàn bộ 100ms thì tôi có muốn xử lý nó không. Điều này mở ra khả năng không bao giờ xử lý dữ liệu, vì vậy tôi cũng có thể chỉ định số lượng tối đa. Tôi sẽ tưởng tượng điều gì đó dọc theo các dòng:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000) 

Tôi đã xem xét và không thể tìm thấy bất kỳ thứ gì như thế này trong Rx? Bất cứ ai có thể xác nhận/từ chối điều này?

+0

Tôi chắc chắn rằng tôi đã thấy hành vi này trong một trong các video hướng dẫn về Rx nhưng tôi sợ rằng tôi không thể nhớ chính xác địa điểm hoặc nơi nào. :( – Chris

+0

Ah, ga (http://msdn.microsoft.com/en-us/library/hh229298%28v=vs.103%29.aspx) là những gì tôi đã nghĩ đến nhưng tôi không nghĩ rằng những gì bạn không muốn chắc chắn rằng có thể có một số cách để kết hợp nó để làm những gì muốn ... – Chris

Trả lời

12

Tôi đã viết phần mở rộng để thực hiện hầu hết những gì bạn đang theo dõi - BufferWithInactivity.

Ở đây là:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source, 
    TimeSpan inactivity, 
    int maximumBufferSize) 
{ 
    return Observable.Create<IEnumerable<T>>(o => 
    { 
     var gate = new object(); 
     var buffer = new List<T>(); 
     var mutable = new SerialDisposable(); 
     var subscription = (IDisposable)null; 
     var scheduler = Scheduler.ThreadPool; 

     Action dump =() => 
     { 
      var bts = buffer.ToArray(); 
      buffer = new List<T>(); 
      if (o != null) 
      { 
       o.OnNext(bts); 
      } 
     }; 

     Action dispose =() => 
     { 
      if (subscription != null) 
      { 
       subscription.Dispose(); 
      } 
      mutable.Dispose(); 
     }; 

     Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted = 
      onAction => 
      { 
       lock (gate) 
       { 
        dispose(); 
        dump(); 
        if (o != null) 
        { 
         onAction(o); 
        } 
       } 
      }; 

     Action<Exception> onError = ex => 
      onErrorOrCompleted(x => x.OnError(ex)); 

     Action onCompleted =() => onErrorOrCompleted(x => x.OnCompleted()); 

     Action<T> onNext = t => 
     { 
      lock (gate) 
      { 
       buffer.Add(t); 
       if (buffer.Count == maximumBufferSize) 
       { 
        dump(); 
        mutable.Disposable = Disposable.Empty; 
       } 
       else 
       { 
        mutable.Disposable = scheduler.Schedule(inactivity,() => 
        { 
         lock (gate) 
         { 
          dump(); 
         } 
        }); 
       } 
      } 
     }; 

     subscription = 
      source 
       .ObserveOn(scheduler) 
       .Subscribe(onNext, onError, onCompleted); 

     return() => 
     { 
      lock (gate) 
      { 
       o = null; 
       dispose(); 
      } 
     }; 
    }); 
} 
+0

+1 cảm ơn Bạn đã viết câu hỏi này chỉ cho câu hỏi này hay cho chính mình? Nó đã được sử dụng trong mã sản xuất chưa? –

+0

@KentBoogaart - Tôi đã viết nó vài tháng trước, nhưng nó chưa có trong mã sản xuất. Nó vẫn là một WIP – Enigmativity

+0

+1 Sử dụng Nice SerialDisposable cho ga ... –

0

Tôi đoán đây có thể được thực hiện trên phương pháp đệm như hình dưới đây:

public static IObservable<IList<T>> SlidingBuffer<T>(this IObservable<T> obs, TimeSpan span, int max) 
     { 
      return Observable.CreateWithDisposable<IList<T>>(cl => 
      { 
       var acc = new List<T>(); 
       return obs.Buffer(span) 
         .Subscribe(next => 
         { 
          if (next.Count == 0) //no activity in time span 
          { 
           cl.OnNext(acc); 
           acc.Clear(); 
          } 
          else 
          { 
           acc.AddRange(next); 
           if (acc.Count >= max) //max items collected 
           { 
            cl.OnNext(acc); 
            acc.Clear(); 
           } 
          } 
         }, err => cl.OnError(err),() => { cl.OnNext(acc); cl.OnCompleted(); }); 
      }); 
     } 

Chú ý: Tôi đã không kiểm tra nó, nhưng tôi hy vọng nó cho bạn ý tưởng.

12

Điều này có thể bằng cách kết hợp các phương pháp được xây dựng trong WindowThrottle của Observable. Trước tiên, chúng ta hãy giải quyết vấn đề đơn giản hơn khi chúng ta bỏ qua điều kiện đếm tối đa:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay) 
{ 
    var closes = stream.Throttle(delay); 
    return stream.Window(() => closes).SelectMany(window => window.ToList()); 
} 

Mạnh mẽ nâng lên. Giờ đây, thật dễ dàng để xem cách thêm số lượng tối đa:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null) 
{ 
    var closes = stream.Throttle(delay); 
    if (max != null) 
    { 
     var overflows = stream.Where((x,index) => index+1>=max); 
     closes = closes.Merge(overflows); 
    } 
    return stream.Window(() => closes).SelectMany(window => window.ToList()); 
} 

Tôi sẽ viết một bài giải thích điều này trên blog của mình. https://gist.github.com/2244036

Tài liệu dành cho phương pháp cửa sổ:

+0

Với kịch bản BufferUntilInactive ở trên - Nếu thuê bao chậm hơn nhà sản xuất, bạn có thể thấy một kịch bản mà tập tiếp theo của các mục cửa sổ sẽ được đệm và sẽ không được đẩy tới thuê bao trừ khi một mục được tạo ... –

+0

Tôi đã đính kèm một mẫu http://snipt.org/Bhao0. Trong phòng thu trực quan (1) mở cửa sổ đầu ra (2) Kiểm tra nút treo (3) Nhấp vào nút (4) chờ cho nó để in "Click bây giờ" trên bàn điều khiển. (5) nhấn nút ba lần, bạn sẽ thấy ba nhấp chuột bị bỏ qua. –

3

Với Rx Extensions 2.0, bạn có thể trả lời tất cả các yêu cầu với một bộ đệm mới quá tải chấp nhận một thời gian chờ và kích thước:

this.subscription = this.dataService 
    .Where(x => !string.Equals("FOO", x.Key.Source)) 
    .Buffer(TimeSpan.FromMilliseconds(100), 1) 
    .ObserveOn(this.dispatcherService) 
    .Where(x => x.Count != 0) 
    .Subscribe(this.OnBufferReceived); 

Xem https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx để biết tài liệu.

+0

Nhưng điều này sẽ không có cửa sổ trượt, với loại hành vi 'gỡ rối' đã được yêu cầu? – Cocowalla

+0

@Cocowalla Tôi đọc lại câu hỏi gốc và mã tôi đã cung cấp không đáp ứng tất cả các yêu cầu. Tôi đã sử dụng điều này trong mã sản xuất với thành công lớn. –

+0

Xin lỗi, ý tôi là cụ thể là hành vi gỡ lỗi: "Tôi muốn đặt lại hẹn giờ nếu một mục dữ liệu khác được nhận" - Tôi không thấy mã của bạn thực hiện điều này? AFAICS, mã của bạn sẽ luôn luôn đẩy bộ đệm đến người đăng ký 100ms một lần (miễn là nó không trống) – Cocowalla

Các vấn đề liên quan