2013-09-24 45 views
18

Trong sách IntroToRx tác giả đề xuất viết thử lại "thông minh" cho I/O để thử lại yêu cầu I/O, như yêu cầu mạng sau một khoảng thời gian.Viết phương thức mở rộng Rx "RetryAfter"

Dưới đây là đoạn chính xác:

Một phương pháp mở rộng hữu ích để thêm vào thư viện riêng của bạn có thể là một phương pháp "Back Tắt và Thử lại". Các nhóm tôi đã làm việc với đã tìm thấy một tính năng hữu ích khi thực hiện I/O, đặc biệt là các yêu cầu mạng. Khái niệm là để thử và khi không đợi trong một khoảng thời gian nhất định và thì thử lại. Phiên bản của phương pháp này có thể tính đến loại ngoại lệ mà bạn muốn thử lại, cũng như số tối đa số lần thử lại. Bạn thậm chí có thể muốn kéo dài thời gian chờ đợi để ít hung hăng hơn cho mỗi lần thử lại tiếp theo.

Thật không may, tôi không thể tìm ra cách viết phương pháp này. :(

Trả lời

29

Chìa khóa để thực hiện lại việc thử lại này là deferred observables. Một quan sát được hoãn lại sẽ không thực thi nhà máy của nó cho đến khi ai đó đăng ký với nó và nó sẽ gọi nhà máy cho mỗi đăng ký, làm cho nó trở nên lý tưởng cho việc thử lại của chúng tôi kịch bản.

Giả sử chúng ta có một phương pháp mà gây nên một yêu cầu mạng.

public IObservable<WebResponse> SomeApiMethod() { ... } 

theo mục đích của đoạn này chút, chúng ta hãy xác định trì hoãn như source

var source = Observable.Defer(() => SomeApiMethod()); 

Bất cứ khi nào ai đó đăng ký nguồn, nó sẽ gọi SomeApiMethod và khởi chạy một yêu cầu web mới. Cách ngây thơ để thử lại nó bất cứ khi nào nó không thành công sẽ được sử dụng trong xây dựng trong nhà điều hành thử lại.

source.Retry(4) 

Điều đó sẽ không tốt cho API mặc dù nó không phải là những gì bạn đang yêu cầu. Chúng ta cần trì hoãn việc khởi chạy các yêu cầu giữa mỗi lần thử. Một cách để làm điều đó là với một delayed subscription.

Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4) 

Đó không phải là lý do vì nó sẽ thêm độ trễ ngay cả trong yêu cầu đầu tiên, chúng ta hãy khắc phục điều đó.

int attempt = 0; 
Observable.Defer(() => { 
    return ((++attempt == 1) ? source : source.DelaySubscription(TimeSpan.FromSeconds(1))) 
}) 
.Retry(4) 
.Select(response => ...) 

Chỉ tạm dừng cho giây không phải là phương pháp thử lại rất tốt mặc dù vậy hãy thay đổi hằng số thành hàm nhận số lần thử lại và trả về độ trễ thích hợp. Trả về theo hàm mũ là đủ dễ thực hiện.

Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2)); 

((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1))) 

Chúng tôi gần như hoàn tất ngay bây giờ, chúng tôi chỉ cần thêm cách chỉ định ngoại lệ mà chúng tôi nên thử lại. Hãy thêm một hàm cho một ngoại lệ trả về việc có thử lại hay không, chúng ta sẽ gọi nó là retryOnError.

Bây giờ chúng ta cần viết một số mã tìm kiếm đáng sợ nhưng phải chịu đựng với tôi.

Observable.Defer(() => { 
    return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1))) 
     .Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null)) 
     .Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e) 
      ? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e) 
      : Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e))); 
}) 
.Retry(retryCount) 
.SelectMany(t => t.Item1 
    ? Observable.Return(t.Item2) 
    : Observable.Throw<T>(t.Item3)) 

Tất cả những dấu ngoặc nhọn đang có để sắp xếp một ngoại lệ mà chúng ta không nên thử lại quá khứ .Retry(). Chúng tôi đã làm cho các bên trong quan sát được một IObservable<Tuple<bool, WebResponse, Exception>> nơi bool đầu tiên chỉ ra nếu chúng ta có một phản ứng hoặc một ngoại lệ. Nếu retryOnError chỉ ra rằng chúng ta nên thử lại cho một ngoại lệ đặc biệt, quan sát bên trong sẽ ném và sẽ được chọn bằng cách thử lại. SelectMany chỉ mở gói Tuple của chúng ta và làm cho kết quả có thể quan sát được là IObservable<WebRequest> một lần nữa.

Xem gist with full source and tests của tôi cho phiên bản cuối cùng. Có điều hành này cho phép chúng ta viết mã retry của chúng tôi khá ngắn gọn

Observable.Defer(() => SomApiMethod()) 
    .RetryWithBackoffStrategy(
    retryCount: 4, 
    retryOnError: e => e is ApiRetryWebException 
) 
+0

lớn thứ Markus. –

+1

Dường như với tôi, việc triển khai này không thể hủy đăng ký. Đó là một chút khó khăn để dán ở đây, nhưng hãy thử điều này, bạn sẽ thấy khoảng thời gian giữ ticking: Observable.Interval (TimeSpan.FromSeconds (1)) Do (Console.WriteLine) .RetryWithBackoffStrategy() Take (1). Đăng ký(); –

+0

@NiallConnaughton Đẹp bắt! Lý do mà nó không hủy đăng ký từ nguồn đã phải làm với tôi ban đầu đã mô hình hóa phương pháp sau khi một nhà điều hành nội bộ khác, chúng tôi có, một trong đó sản xuất quan sát nóng. Nhà điều hành này không nên làm điều đó. Tôi đã thay đổi mã để tạo ra các quan sát lạnh và thêm một thử nghiệm để xác minh rằng nó hủy đăng ký. Cảm ơn! –

1

Đây là một thực hiện hơi khác tôi đã đưa ra trong khi nghiên cứu cách Rxx hiện nó. Vì vậy, nó phần lớn là một phiên bản cắt giảm của phương pháp tiếp cận của Rxx.

Chữ ký hơi khác so với phiên bản của Markus. Bạn chỉ định một loại Ngoại lệ để thử lại và chiến lược trì hoãn có ngoại lệ và số lần thử lại, vì vậy bạn có thể bị trì hoãn lâu hơn cho mỗi lần thử lại liên tiếp, v.v.

Tôi không thể đảm bảo bằng chứng lỗi hoặc cách tiếp cận tốt nhất, nhưng có vẻ như nó hoạt động.

public static IObservable<TSource> RetryWithDelay<TSource, TException>(this IObservable<TSource> source, Func<TException, int, TimeSpan> delayFactory, IScheduler scheduler = null) 
where TException : Exception 
{ 
    return Observable.Create<TSource>(observer => 
    { 
     scheduler = scheduler ?? Scheduler.CurrentThread; 
     var disposable = new SerialDisposable(); 
     int retryCount = 0; 

     var scheduleDisposable = scheduler.Schedule(TimeSpan.Zero, 
     self => 
     { 
      var subscription = source.Subscribe(
      observer.OnNext, 
      ex => 
      { 
       var typedException = ex as TException; 
       if (typedException != null) 
       { 
        var retryDelay = delayFactory(typedException, ++retryCount); 
        self(retryDelay); 
       } 
       else 
       { 
        observer.OnError(ex); 
       } 
      }, 
      observer.OnCompleted); 

      disposable.Disposable = subscription; 
     }); 

     return new CompositeDisposable(scheduleDisposable, disposable); 
    }); 
} 
+0

Tôi tìm thấy một trường hợp cạnh mà impl này phá vỡ (trộn lập lịch trình ngay lập tức và CurrentThread): int a = 0; Observable.Defer (() => a ++ <1? Observable.Return (a): Observable.Timer (TimeSpan.Zero, Scheduler.CurrentThread) .SelectMany (Observable.Return (a))) .Concat (Observable.Throw (new Exception())) RetryWithDelay ((ví dụ, i) => TimeSpan.Zero, Scheduler.Immediate) .Đăng ký (i => Console.WriteLine (i)); Một sửa chữa đơn giản để hỗ trợ Scheduler.Immediate sẽ là để kiểm tra xem giá trị đăng ký đã thay đổi trong cuộc gọi Đăng ký() trước khi gán serial-dùng một lần. – blueling

+1

Tùy thuộc vào ngữ nghĩa dự định của hàm RetryWithDelay(), bạn có thể đặt lại hàm retryCount thành 0 trong trình xử lý OnNext, ví dụ: onNext: x => {observer.OnNext (x); retryCount = 0; }. – blueling

10

Có lẽ tôi là qua đơn giản hóa tình hình, nhưng nếu chúng ta nhìn vào thực hiện Thử lại, nó chỉ đơn giản là một Observable.Catch trên một đếm vô hạn của quan sát:

private static IEnumerable<T> RepeatInfinite<T>(T value) 
{ 
    while (true) 
     yield return value; 
} 

public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source) 
{ 
    return Observable.Catch<TSource>(QueryLanguage.RepeatInfinite<IObservable<TSource>(source)); 
} 

Vì vậy, nếu chúng ta lấy cách tiếp cận này, chúng ta chỉ có thể thêm một sự chậm trễ sau khi sản lượng đầu tiên.

private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource> (IObservable<TSource> source, TimeSpan dueTime) 
{ 
    // Don't delay the first time   
    yield return source; 

    while (true) 
     yield return source.DelaySubscription(dueTime); 
    } 

public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime) 
{ 
    return RepeateInfinite(source, dueTime).Catch(); 
} 

Một tình trạng quá tải mà bắt một ngoại lệ cụ thể với một số thử lại có thể còn ngắn gọn hơn:

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(this IObservable<TSource> source, TimeSpan dueTime, int count) where TException : Exception 
{ 
    return source.Catch<TSource, TException>(exception => 
    { 
     if (count <= 0) 
     { 
      return Observable.Throw<TSource>(exception); 
     } 

     return source.DelaySubscription(dueTime).RetryAfterDelay<TSource, TException>(dueTime, --count); 
    }); 
} 

Lưu ý rằng tình trạng quá tải ở đây là sử dụng đệ quy. Trong lần xuất hiện đầu tiên, có vẻ như một StackOverflowException là có thể nếu đếm là một cái gì đó giống như Int32.MaxValue. Tuy nhiên, DelaySubscription sử dụng bộ lập lịch để chạy hành động đăng ký, vì vậy không thể ngăn xếp ngăn xếp (tức là sử dụng "trampolining"). Tôi đoán điều này là không thực sự rõ ràng bằng cách nhìn vào mã mặc dù. Chúng ta có thể ép buộc tràn ngăn xếp bằng cách thiết lập rõ ràng bộ lập lịch trong quá tải DelaySubscription thành Scheduler.Immediate, và truyền trong TimeSpan.Zero và Int32.MaxValue. Chúng ta có thể vượt qua trong một lịch trình không ngay lập tức bày tỏ ý định của chúng tôi một cách rõ ràng hơn một chút, ví dụ .:

return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay<TSource, TException>(dueTime, --count); 

UPDATE: Added quá tải để có trong một lịch trình cụ thể.

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(
    this IObservable<TSource> source, 
    TimeSpan retryDelay, 
    int retryCount, 
    IScheduler scheduler) where TException : Exception 
{ 
    return source.Catch<TSource, TException>(
     ex => 
     { 
      if (retryCount <= 0) 
      { 
       return Observable.Throw<TSource>(ex); 
      } 

      return 
       source.DelaySubscription(retryDelay, scheduler) 
        .RetryAfterDelay<TSource, TException>(retryDelay, --retryCount, scheduler); 
     }); 
} 
+0

Bạn cũng có thể thay thế thông số thời gian chờ bằng một nhà máy nếu bạn muốn, như trong các ví dụ trên. –

+0

Tôi thích phiên bản đầu tiên, ít chắc chắn hơn về phiên bản đệ quy. Lý tưởng nhất là bạn muốn cho phép người dùng vượt qua trong Trình lập lịch biểu, tại thời điểm đó bạn không còn có thể đảm bảo người dùng nào sẽ sử dụng. – Benjol

+0

Cảm ơn đã bình luận Benjol. Tôi đồng ý. Tôi thực sự sử dụng một quá tải mà mất một lịch trình (cập nhật mã được thêm ở trên). Bạn đang phải mặc dù thực tế là một người dùng sau đó có thể vượt qua ngay lập tức. Một trong những giải pháp tiềm năng cho điều này có thể là ném một ngoại lệ nếu Scheduler.Immediate được thông qua. –

2

Dưới đây là một trong những Tôi đang sử dụng:

public static IObservable<T> DelayedRetry<T>(this IObservable<T> src, TimeSpan delay) 
{ 
    Contract.Requires(src != null); 
    Contract.Ensures(Contract.Result<IObservable<T>>() != null); 

    if (delay == TimeSpan.Zero) return src.Retry(); 
    return src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry()); 
} 
+0

có bất kỳ sự khác biệt nào giữa 'src.Catch (Observable.Timer (delay) .SelectMany (x => src) .Retry())' và 'src.Catch (src.DelaySubscription (delay) .Retry()) '? – Sabacc

+0

Không giống như vậy. –

+0

Tôi sợ rằng tôi đã bỏ lỡ điều gì đó. Tôi nghĩ DelaySubscription dễ hiểu hơn Observable.Timer + SelectMany. Cảm ơn bạn đã chia sẻ giải pháp đã giúp tôi tìm thấy giải pháp của riêng mình :) – Sabacc

0

Dưới đây là một trong tôi đã đưa ra.

Không muốn ghép các mục của từng lần thử lại thành một chuỗi, nhưng phát ra chuỗi nguồn như một toàn bộ trên mỗi lần thử lại - vì vậy toán tử trả về IObservable<IObservable<TSource>>. Nếu điều này không được mong muốn, nó có thể chỉ đơn giản là Switch() được chỉnh sửa lại thành một chuỗi.

(Bối cảnh: trong trường hợp sử dụng của tôi, nguồn là một chuỗi nóng nóng, mà tôi GroupByUntil một mục xuất hiện để đóng nhóm. Nếu mục này bị mất giữa hai lần thử lại, nhóm sẽ không bao giờ bị đóng, dẫn đến rò rỉ bộ nhớ. có một chuỗi các trình tự cho phép nhóm trên các trình tự bên trong chỉ (hoặc xử lý ngoại lệ hay ...))

/// <summary> 
/// Repeats <paramref name="source"/> in individual windows, with <paramref name="interval"/> time in between. 
/// </summary> 
public static IObservable<IObservable<TSource>> RetryAfter<TSource>(this IObservable<TSource> source, TimeSpan interval, IScheduler scheduler = null) 
{ 
    if (scheduler == null) scheduler = Scheduler.Default; 
    return Observable.Create<IObservable<TSource>>(observer => 
    { 
     return scheduler.Schedule(self => 
     { 
      observer.OnNext(Observable.Create<TSource>(innerObserver => 
      { 
       return source.Subscribe(
        innerObserver.OnNext, 
        ex => { innerObserver.OnError(ex); scheduler.Schedule(interval, self); }, 
        () => { innerObserver.OnCompleted(); scheduler.Schedule(interval, self); }); 
      })); 
     }); 
    }); 
} 
+0

btw bài đăng đầu tiên của tôi trên stackoverflow *** – tinudu

2

Dựa trên Markus' câu trả lời tôi đã viết như sau:.

public static class ObservableExtensions 
{ 
    private static IObservable<T> BackOffAndRetry<T>(
     this IObservable<T> source, 
     Func<int, TimeSpan> strategy, 
     Func<int, Exception, bool> retryOnError, 
     int attempt) 
    { 
     return Observable 
      .Defer(() => 
      { 
       var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt); 
       var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay); 

       return s 
        .Catch<T, Exception>(e => 
        { 
         if (retryOnError(attempt, e)) 
         { 
          return source.BackOffAndRetry(strategy, retryOnError, attempt + 1); 
         } 
         return Observable.Throw<T>(e); 
        }); 
      }); 
    } 

    public static IObservable<T> BackOffAndRetry<T>(
     this IObservable<T> source, 
     Func<int, TimeSpan> strategy, 
     Func<int, Exception, bool> retryOnError) 
    { 
     return source.BackOffAndRetry(strategy, retryOnError, 0); 
    } 
} 

tôi thích hơn vì

  • nó không sửa đổi attempts nhưng sử dụng đệ quy.
  • Nó không sử dụng retries nhưng vượt qua số nỗ lực nhằm retryOnError
Các vấn đề liên quan