2012-07-01 24 views
6

Tôi có một loạt các sự kiện sắp tới và tôi phải thực hiện TẤT CẢ chúng mà không bị mất, nhưng tôi muốn đảm bảo rằng chúng được đệm và tiêu thụ tại các khe thời gian thích hợp. Bất cứ ai có một giải pháp?Cách tốt nhất để "giới hạn tốc độ" tiêu thụ của một Đài quan sát là gì?

Tôi không thể tìm thấy bất kỳ toán tử nào trong Rx có thể làm điều đó mà không làm mất các sự kiện (Throttle - mất sự kiện). Tôi cũng được coi là Buffered, Delay, etc ... Không thể tìm ra giải pháp tốt.

Tôi đã cố gắng để đặt một bộ đếm thời gian ở giữa, nhưng bằng cách nào đó nó không hoạt động tại tất cả:

GetInitSequence() 
      .IntervalThrottle(TimeSpan.FromSeconds(5)) 
      .Subscribe(
       item => 
        { 
         Console.WriteLine(DateTime.Now); 
         // Process item 
        } 
      ); 

public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime) 
    { 
     return Observable.Create<T>(o => 
      { 
       return source.Subscribe(x => 
        { 
         new Timer(state => 
          o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1)); 
        }, o.OnError, o.OnCompleted); 
     }); 
    } 
+0

Bạn có thể thêm bằng đá cẩm thạch digram cho thấy những gì bạn có, và những gì bạn muốn? Giống như những người khác, tôi không chắc chắn những gì bạn đang cố gắng đạt được, như tôi nghĩ Buffer là tốt. –

+0

Bạn giới hạn điều gì? – Fredrick

Trả lời

10

Câu hỏi đặt ra không phải là 100% rõ ràng vì vậy tôi làm cho một số giả định.

Observable.Delay không phải là những gì bạn muốn vì điều đó sẽ tạo ra sự chậm trễ khi mỗi sự kiện đến, thay vì tạo khoảng thời gian để xử lý.

Observable.Buffer không phải là những gì bạn muốn vì điều đó sẽ khiến tất cả các sự kiện trong mỗi khoảng thời gian nhất định được chuyển cho bạn, thay vì mỗi lần một lần.

Vì vậy, tôi tin rằng bạn đang tìm kiếm giải pháp tạo ra một số loại nhịp số đánh dấu đi và cung cấp cho bạn sự kiện trên mỗi lần đánh dấu. Điều này có thể ngây thơ xây dựng bằng Observable.Interval cho nhịp và Zip để kết nối nó với nguồn của bạn:

var source = GetInitSequence(); 
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));  
var triggeredSource = source.Zip(trigger, (s,_) => s); 
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now)); 

này sẽ kích hoạt mỗi 5 giây (trong ví dụ trên), và cung cấp cho bạn các mặt hàng ban đầu theo thứ tự.

Vấn đề duy nhất với giải pháp này là nếu bạn không có thêm bất kỳ yếu tố nguồn nào (nói) 10 giây, khi các yếu tố nguồn đến, chúng sẽ được gửi ngay lập tức vì một số sự kiện 'kích hoạt' đang ở đó chờ họ. Biểu đồ đá cẩm thạch cho trường hợp đó:

source: -a-b-c----------------------d-e-f-g 
trigger: ----o----o----o----o----o----o----o 
result: ----a----b----c-------------d-e-f-g 

Đây là vấn đề rất hợp lý.Có hai câu hỏi ở đây đã có giải quyết nó:

Rx IObservable buffering to smooth out bursts of events

A way to push buffered events in even intervals

Giải pháp cung cấp là một phương pháp Drain mở rộng chính và Buffered mở rộng thứ cấp. Tôi đã sửa đổi những thứ này đơn giản hơn nhiều (không cần Drain, chỉ cần sử dụng Concat). Cách sử dụng:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5)); 

Các phương pháp khuyến nông StepInterval:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay) 
{ 
    return source.Select(x => 
     Observable.Empty<T>() 
      .Delay(minDelay) 
      .StartWith(x) 
    ).Concat(); 
} 
+0

Cảm ơn. Vẫn không thực sự những gì tôi đang tìm kiếm nhưng nó cho tôi thấy một số ý tưởng. Tôi chỉ thất vọng với Rx - tại sao nó phải phức tạp và không có tài liệu thích hợp. Đường cong học tập dốc và yêu cầu kiến ​​thức rộng về chủ đề để đạt được điều gì đó có giá trị. #fail – IgorM

+1

Đồng ý. Đó là lý do tại sao tôi đã dành rất nhiều thời gian viết IntroToRx.com để giúp mọi người ở vị trí của bạn. Thật khó, và có rất nhiều thứ để học. –

+0

Tôi thực sự thấy các toán tử Rx này khó đọc và lý do. Tôi nghĩ đó là giới hạn của tôi - có lẽ vì tôi có một tâm trí trực quan và tôi không thể hình dung được kết quả. Có cơ hội nhận được một sơ đồ đá cẩm thạch cho những gì các mã trong câu trả lời này không? –

0

Làm thế nào về Observable.Buffer? Điều này sẽ trả về tất cả các sự kiện trong cửa sổ 1s dưới dạng một sự kiện duy nhất.

var xs = Observable.Interval(TimeSpan.FromMilliseconds(100)); 
var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(5)); 
bufferdStream.Subscribe(item => { Console.WriteLine("Number of events in window: {0}", item.Count); }); 

Nó có thể là những gì bạn đang yêu cầu là không rõ ràng. Mã của bạn phải làm gì? Có vẻ như bạn chỉ đang trì hoãn bằng cách tạo bộ hẹn giờ cho từng sự kiện. Nó cũng phá vỡ ngữ nghĩa của các quan sát như tiếp theo và hoàn thành có thể xảy ra trước khi tiếp theo.

Lưu ý điều này cũng chỉ chính xác ở bộ hẹn giờ được sử dụng. Thông thường các bộ đếm thời gian chính xác đến tối đa 16ms.

Edit:

ví dụ của bạn sẽ trở thành, và mục chứa tất cả các sự kiện trong cửa sổ:

GetInitSequence() 
      .Buffer(TimeSpan.FromSeconds(5)) 
      .Subscribe(
       item => 
        { 
         Console.WriteLine(DateTime.Now); 
         // Process item 
        } 
      ); 
1

Tôi biết điều này chỉ có thể là quá đơn giản, nhưng sẽ làm việc này?

var intervaled = source.Do(x => { Thread.Sleep(100); }); 

Về cơ bản, điều này chỉ đặt độ trễ tối thiểu giữa các giá trị. Quá đơn giản?

+0

Điều này phù hợp và sửa chữa hành vi của OPS IntervalThrottle là điều này thực sự hợp lý? –

+1

Eeek ... chặn chủ đề !? Đó là loại bay khi đối mặt với hiệu trưởng Rx phải không? –

+0

Vâng, trong đó ý nghĩa tinh khiết nhất của nó là chống lại Rx, nhưng yêu cầu là chặn –

1

Dọc theo dòng câu trả lời Enigmativity, nếu tất cả các bạn muốn làm chỉ là trễ tất cả các giá trị bằng một TimeSpan, tôi không thể hiểu tại sao Delay không phải là các nhà điều hành bạn muốn

GetInitSequence() 
     .Delay(TimeSpan.FromSeconds(5)) //ideally pass an IScheduler here 
     .Subscribe(
      item => 
       { 
        Console.WriteLine(DateTime.Now); 
        // Process item 
       } 
     ); 
Các vấn đề liên quan