2012-07-12 16 views
11

Một trường hợp sử dụng mà tôi đã gặp, và tôi nghi ngờ tôi không thể là người duy nhất, là một phương pháp như:Có phương pháp Rx nào để lặp lại giá trị trước đó định kỳ khi không có giá trị nào đến không?

IObservable<T> Observable.RepeatLastValueDuringSilence(this IObservable<T> inner, TimeSpan maxQuietPeriod); 

mà sẽ trả lại tất cả các mục trong tương lai từ những quan sát bên trong, nhưng cũng có thể, nếu bên trong quan sát không gọi OnNext trong một khoảng thời gian nhất định (maxQuietPeriod), nó chỉ lặp lại giá trị cuối cùng (cho đến khi tất nhiên bên trong gọi OnCompleted hoặc OnError).

Một biện minh cho dịch vụ định kỳ sẽ ping cập nhật trạng thái định kỳ. Ví dụ:

var myStatus = Observable.FromEvent(
    h=>this.StatusUpdate+=h, 
    h=>this.StatusUpdate-=h); 

var messageBusStatusPinger = myStatus 
    .RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1)) 
    .Subscribe(update => _messageBus.Send(update)); 

Có điều gì giống như vậy không? Hay tôi ước lượng quá mức tính hữu ích của nó?

Cảm ơn, Alex

PS: Tôi xin lỗi vì bất kỳ thuật ngữ/cú pháp không chính xác, như tôi chỉ mới khám phá Rx cho lần đầu tiên.

+0

Tần suất lặp lại liên tục? – Asti

+0

@Asti: ý định là điều này nên được chỉ định bởi maxQuietPeriod. Nếu chuỗi bên trong không tạo ra giá trị cho maxQuietPeriod, thì giá trị lặp lại sẽ được tạo ra. – AlexC

Trả lời

5

giải pháp tương tự để Matthew, nhưng ở đây các timer bắt đầu sau khi mỗi phần tử được nhận trong nguồn, mà tôi nghĩ là chính xác hơn (tuy nhiên sự khác biệt có thể không quan trọng):

public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod) 
{  
    return inner.Select(x => 
     Observable.Interval(maxQuietPeriod) 
        .Select(_ => x) 
        .StartWith(x) 
    ).Switch(); 
} 

Và kiểm tra:

var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "1") 
         .Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_ => "2")) 
         .Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "3")); 

source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine); 

Bạn sẽ thấy 1 in 10 lần (5 từ nguồn, 5 lặp đi lặp lại trong im lặng), sau đó rất nhiều 2 khi bạn nhận được một từ mã nguồn và 4 nhiều hơn từ sự im lặng giữa mỗi, theo sau là vô hạn 3.

+0

Điều này trông giống như những gì tôi đã sau, cảm ơn. – AlexC

-1

Không có phương thức nào trong thư viện Rx, nhưng tôi cũng cần có phương pháp như vậy. Trong trường hợp sử dụng của tôi, tôi cần thiết để xuất các giá trị ngay cả khi nguồn không đưa ra bất kỳ giá trị nào. Nếu bạn không muốn đưa ra bất kỳ giá trị nào cho đến khi giá trị nguồn đầu tiên đi qua, bạn có thể xóa thông số defaultValue và gọi tới số createTimer() trước cuộc gọi đăng ký.

Trình lên lịch là cần thiết để chạy hẹn giờ. Một quá tải rõ ràng sẽ là một trong đó không có một lịch trình và chọn một lịch trình mặc định (tôi sử dụng các Scheduler ThreadPool).

Imports System.Reactive 
Imports System.Reactive.Concurrency 
Imports System.Reactive.Disposables 
Imports System.Reactive.Linq 

<Extension()> 
Public Function AtLeastEvery(Of T)(source As IObservable(Of T), 
            timeout As TimeSpan, 
            defaultValue As T, 
            scheduler As IScheduler 
           ) As IObservable(Of T) 
    If source Is Nothing Then Throw New ArgumentNullException("source") 
    If scheduler Is Nothing Then Throw New ArgumentNullException("scheduler") 
    Return Observable.Create(
     Function(observer As IObserver(Of T)) 
      Dim id As ULong 
      Dim gate As New Object() 
      Dim timer As New SerialDisposable() 
      Dim lastValue As T = defaultValue 

      Dim createTimer As Action = 
       Sub() 
        Dim startId As ULong = id 
        timer.Disposable = scheduler.Schedule(timeout, 
              Sub(self As Action(Of TimeSpan)) 
               Dim noChange As Boolean 
               SyncLock gate 
                noChange = (id = startId) 
                If noChange Then 
                 observer.OnNext(lastValue) 
                End If 
               End SyncLock 
               'only restart if no change, otherwise 
               'the change restarted the timeout 
               If noChange Then self(timeout) 
              End Sub) 
       End Sub 
      'start the first timeout 
      createTimer() 
      'subscribe to the source observable 
      Dim subscription = source.Subscribe(
       Sub(v) 
        SyncLock gate 
         id += 1UL 
         lastValue = v 
        End SyncLock 
        observer.OnNext(v) 
        createTimer() 'reset the timeout 
       End Sub, 
       Sub(ex) 
        SyncLock gate 
         id += 1UL 
        End SyncLock 
        observer.OnError(ex) 
        'do not reset the timeout, because the sequence has ended 
       End Sub, 
       Sub() 
        SyncLock gate 
         id += 1UL 
        End SyncLock 
        observer.OnCompleted() 
        'do not reset the timeout, because the sequence has ended 
       End Sub) 

      Return New CompositeDisposable(timer, subscription) 
     End Function) 
End Function 
1

Tôi nghĩ rằng đây không những gì bạn muốn, nếu quan sát của bạn không phải là nóng bạn sẽ cần phải PublishRefcount nó:

public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod) 
{ 
    var throttled = inner.Throttle(maxQuietPeriod); 
    var repeating = throttled.SelectMany(i => 
     Observable 
      .Interval(maxQuietPeriod) 
      .Select(_ => i) 
      .TakeUntil(inner)); 
    return Observable.Merge(inner, throttled, repeating); 
} 
+0

Cần lưu ý rằng để làm việc này có thể quan sát được phải nóng - có lẽ thêm phần này ở phía trước: 'var published = inner.Publish().RefCount(); ' – yamen

+0

Chúc mừng, tôi đã chỉnh sửa trong –

5

truy vấn khá đơn giản này không được công việc:

var query = 
    source 
     .Select(s => 
      Observable 
       .Interval(TimeSpan.FromSeconds(1.0)) 
       .StartWith(s) 
       .Select(x => s)) 
     .Switch(); 

Không bao giờ đánh giá thấp sức mạnh của .Switch().

+0

Rất đẹp. Tôi đã gỡ bỏ 'TakeUntil' và thay đổi' Concat' thành 'Switch'. – yamen

Các vấn đề liên quan