2012-06-07 28 views
10

Tôi có một ứng dụng tại một số điểm tăng 1000 sự kiện gần như cùng một lúc. Những gì tôi muốn làm là để bó các sự kiện cho các phần của 50 mặt hàng và bắt đầu xử lý chúng sau mỗi 10 giây. Không cần phải đợi một lô hoàn thành trước khi bắt đầu xử lý hàng loạt mới.Tiện ích mở rộng phản ứng: Xử lý sự kiện theo lô + thêm độ trễ giữa mỗi lô

Ví dụ:

10:00:00: 10000 new events received 
10:00:00: StartProcessing (events.Take(50)) 
10:00:10: StartProcessing (events.Skip(50).Take(50)) 
10:00:15: StartProcessing (events.Skip(100).Take(50)) 

Bất kỳ ý tưởng làm thế nào để đạt được điều này? Tôi cho rằng Reactive Extensions là cách để đi nhưng các giải pháp khác cũng được chấp nhận.

tôi đã cố gắng để bắt đầu từ đây:

 var bufferedItems = eventAsObservable 
      .Buffer(15) 
      .Delay(TimeSpan.FromSeconds(5) 

Nhưng nhận thấy rằng sự chậm trễ đã không làm việc như tôi hy vọng và thay vào đó tất cả các lô bắt đầu cùng một lúc, mặc dù 5 giây trì hoãn.

Tôi cũng đã thử nghiệm phương pháp Window, nhưng tôi không nhận thấy bất kỳ sự khác biệt nào về hành vi. Tôi cho rằng TimeSpan trong Window thực sự có nghĩa là "tận dụng mọi sự kiện mà xảy ra trong 10 giây tiếp theo:.

 var bufferedItems = eventAsObservable 
      .Window(TimeSpan.FromSeconds(10), 5) 
      .SelectMany(x => x) 
      .Subscribe(DoProcessing); 

Tôi đang sử dụng Rx-Main 2.0.20304-beta

Trả lời

20

Nếu bạn không muốn ngủ đề, bạn có thể làm điều này:

var tick = Observable.Interval(TimeSpan.FromSeconds(5)); 

eventAsObservable 
.Buffer(50) 
.Zip(tick, (res, _) => res) 
.Subscribe(DoProcessing); 
+1

Tôi ước tôi có thể upvote điều này nhiều lần! Chỉ cần dành ba giờ để tìm ra. –

1

Hãy thử điều này:

var bufferedItems = eventAsObservable 
     .Buffer(50) 
     .Do(_ => { Thread.Sleep(10000); }); 
Các vấn đề liên quan