2010-09-08 41 views
6

Đây có thể là một câu hỏi ngu ngốc như tôi là một chút mới để RX :)Thay đổi khoảng thời gian của các toán tử RX?

Tôi đang lấy mẫu một sự kiện (RX cho Net 4.0):

eventAsObservable.Sample (TimeSpan.FromSeconds (1)) Dấu thời gian() Đăng ký (x => Console.WriteLine ("test:" + x.Value.EventArgs.str)); Vấn đề là thời gian lấy mẫu cần có khả năng thay đổi khi đang bay, tôi đoán tôi có thể làm cho một số tài sản loại bỏ trình xử lý hiện có và tạo một trình xử lý mới khi nó thay đổi, nhưng có vẻ hơi lộn xộn và hơn thế nữa dễ bị các vấn đề về thời gian. Có cách nào đơn giản thay đổi khoảng thời gian không?

Ví dụ: Giả sử rằng ai đó đang viết một chuỗi ký tự, khi một trình tự nhất định được phát hiện bạn muốn thay đổi thời gian lấy mẫu mà không bỏ sót một sự kiện, và tốt nhất là bằng cách không nhận được một sự kiện nhiều hơn một lần

+0

kịch bản của bạn là gì? –

+0

Tự động hoàn thành nhưng khoảng thời gian lấy mẫu khác nhau tùy thuộc vào nguồn dữ liệu (vì tra cứu cục bộ nhanh hơn dịch vụ web chẳng hạn) – Homde

Trả lời

7

tôi không biết cách thay đổi khoảng thời gian lấy mẫu hiện tại, nhưng những gì bạn có thể làm là lấy mẫu ở tần suất cao nhất bạn cần và sau đó lọc theo mệnh đề Where sử dụng biến số bạn có thể thay đổi.

Ví dụ:

static IObservable<T> SampleEvery<T>(this IObservable<T> source, 
    Func<int> multipleProvider) 
{ 
    int counter = 0; 
    Func<T, bool> predicate = ignored => { 
     counter++; 
     if (counter >= multipleProvider()) 
     { 
      counter = 0; 
     } 
     return counter == 0; 
    }; 
    return source.Where(predicate); 
} 

Bạn muốn sau đó gọi nó như thế này:

// Keep this somewhere you can change it 
int multiple = 1; 

eventAsObservable.Sample(TimeSpan.FromSeconds(1)) 
       .SampleEvery(() => multiple) 
       .Timestamp() 
       .Subscribe(x => Console.WriteLine("testing:" + 
                x.Value.EventArgs.str)); 

Bây giờ, thay đổi giá trị của multiple sẽ thay đổi tần số lấy mẫu có hiệu quả.

Đó là một hack khá xấu, nhưng tôi nghĩ rằng nó sẽ làm việc.

+0

Bạn có thiếu() trên "if (counter> = multipleProvider)" –

+0

@Paul: Có, oops. Đang sửa. –

+0

Dường như một giải pháp khả thi, tôi sẽ thử nghiệm một chút, cảm ơn! Tôi tự hỏi những gì một giải pháp tốt sẽ được cho các kịch bản như thế này ... có lẽ là có thể gửi trong một phương pháp/lambda trả về một khoảng thời gian thay vì thời gian thực tế. Nó không có vẻ xa vời mà bạn muốn thay đổi các thông số cho các toán tử khác nhau trên bay – Homde

0

Tại sao bạn không chỉ đăng ký hai lần?

Observable.Merge(
    eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().SelectMany(x => doLocalLookup(x)), 
    eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Timestamp().SelectMany(x => doRemoteLookup(x)), 
).Subscribe(Console.WriteLine); 

Hoặc nếu tìm kiếm chỉ hoạt động dựa trên một số loại tiền tố hoặc vòng loại như Google Chrome '?' nhà điều hành:

Observable.Merge(
    eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Where(x => isLocal(x)).SelectMany(x => doLocalLookup(x)), 
    eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Where(x => isARemoteQuery(x).SelectMany(x => doRemoteLookup(x)), 
).Subscribe(Console.WriteLine); 
+0

Khoảng thời gian có thể là bất kỳ giá trị nào, nó có thể tùy chỉnh trên mỗi nguồn . Các giải pháp tốt nhất sofar là tạo ra một thuê bao mới và sau đó xử lý cũ, có một cơ hội nhỏ của sự kiện được kích hoạt hai lần mặc dù. Nó sẽ là tuyệt vời nếu có một cách để truy cập vào tài sản thực tế mặc dù – Homde

+0

Những gì tôi nói mặc dù là, bạn chỉ có thể giữ cho tất cả chúng chạy, nhưng chuyển đổi đầu ra của họ thông qua một mệnh đề Where. Có lẽ tôi không hoàn toàn lúng túng chính xác những gì bạn đang làm ... –

5

Tôi biết câu hỏi này đã được trả lời, nhưng tôi nghĩ tôi sẽ thêm một vài cách khác để giải quyết nó theo cách Rx.

Bạn có thể sử dụng Switch trên một chuỗi các TimeSpan 's:

private Subject<TimeSpan> sampleFrequencies = new Subject<TimeSpan>(); 

sampleFrequencies 
    .Select(x => eventAsObservable.Sample(Observable.Interval(x)).Timestamp()) 
    .Switch() 
    .Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str)); 

// To change: 
// sampleFrequencies.OnNext(TimeSpan.FromSeconds(5)); 

Ngoài ra, nó cũng có thể được giải quyết bằng Defer, TakeUntilRepeat (cái này là điên chút và được bao gồm như là một bài tập suy nghĩ):

private TimeSpan sampleFrequency = TiemSpan.FromSeconds(2); 
private Subject<Unit> frequencyChanged = new Subject<Unit>(); 

(Observable 
    .Defer(() => eventAsObservable 
     .Sample(Observable.Interval(sampleFrequency) 
    ) 
    .Timestamp() 
    .TakeUntil(frequencyChanged) 
).Repeat() 
.Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str)); 

// To change: 
// sampleFrequency = TimeSpan.FromSeconds(5); 
// frequencyChanged.OnNext(new Unit()); 
+0

Thực tế, tôi đã làm một tài sản tạo ra một thuê bao mới và sau đó xử lý một gói cũ. Có một cơ hội nhỏ của nó bắn một sự kiện hai lần tôi nghĩ nhưng tôi nghĩ rằng nguy cơ giới thiệu lỗi/chi phí với các phương pháp khác làm cho nó trở thành giải pháp hấp dẫn nhất. Tôi có thể ví dụ muốn thay đổi mẫu để Throttle. Các giải pháp khác có vẻ hơi quá nhiều, nhưng tôi đánh giá cao sự trợ giúp! – Homde

+0

Cả hai giải pháp của tôi vẫn sử dụng mẫu, vì vậy nó có thể dễ dàng được thay đổi để Throttle (các công cụ dấu thời gian đã được thực hiện trực tiếp từ yêu cầu của bạn). Phiên bản Switch hiện khá nhiều những gì bạn đang làm bây giờ (hủy bỏ, khởi động lại), nhưng từ bên trong Switch. –

+0

Giải pháp tuyệt vời với Switch, cảm ơn. –

2

TL; DR: Tạo một sử dụng ObservableFromIntervalFunctor Quan sát, như được hiển thị dưới đây:

void Main() 
{ 
    // Pick an initial period, it can be changed later. 
    var intervalPeriod = TimeSpan.FromSeconds(1); 

    // Create an observable using a functor that captures the interval period. 
    var o = ObservableFromIntervalFunctor(() => intervalPeriod); 

    // Log every value so we can visualize the observable. 
    o.Subscribe(Console.WriteLine); 

    // Sleep for a while so you can observe the observable. 
    Thread.Sleep(TimeSpan.FromSeconds(5.0)); 

    // Changing the interval period will takes effect on next tick. 
    intervalPeriod = TimeSpan.FromSeconds(0.3); 

} 

IObservable<long> ObservableFromIntervalFunctor(Func<TimeSpan> intervalPeriodFunctor) 
{ 
    return Observable.Generate(0L, s => true, s => s + 1, s => s, s => intervalPeriodFunctor()); 
} 

Giải thích: Observable.Generate có tình trạng quá tải cho phép bạn xác định thời gian khi giá trị tiếp theo sẽ được tạo ra thông qua một functor. Bằng cách truyền một hàm functor đã nắm bắt một biến thời gian, bạn có thể làm cho các quan sát được.thay đổi khoảng thời gian bằng cách thay đổi biến thời gian đã ghi.

LINQPad đoạn mã here

+0

Lưu ý: Nếu khoảng thời gian được thay đổi từ chậm sang nhanh hơn, cách tiếp cận này phải đợi cho đến khi khoảng thời gian chậm hoàn thành trước khi thay đổi khoảng thời gian có hiệu lực. Điều này trái với câu trả lời của Jon Skeet, điều này sẽ có hiệu lực sau thời gian lấy mẫu tối thiểu. – r590

Các vấn đề liên quan