2013-08-27 47 views
5

Tôi có một kịch bản tương tác người dùng mà tôi muốn xử lý với Rx.Rx và tác vụ - hủy tác vụ đang chạy khi tác vụ mới được sinh ra?

Kịch bản tương tự như các kinh điển "khi dùng ngừng nhập làm một số công việc" (thường, tìm kiếm những gì người dùng đã gõ cho đến nay) (1) - nhưng tôi cũng cần phải:

  • (2) chỉ nhận được mới nhất kết quả của đơn vị "làm một số công việc" (xem bên dưới)
  • (3) khi một đơn vị công việc mới bắt đầu, hủy bất kỳ công việc nào đang diễn ra (trong trường hợp của tôi là CPU chuyên sâu)

Đối với (1), tôi sử dụng IObservable cho các sự kiện của người dùng, được điều chỉnh với .Throttle() để chỉ kích hoạt khi tạm dừng giữa các sự kiện ("người dùng ngừng nhập").

Từ đó, i .Select(_ => CreateMyTask(...).ToObservable()).

Điều này mang lại cho tôi một IObservable<IObservable<T>> trong đó mỗi quan sát bên trong kết thúc tốt đẹp một tác vụ duy nhất.

Để nhận (2) Cuối cùng, tôi áp dụng .Switch() để chỉ nhận kết quả từ đơn vị công việc mới nhất.

Còn (3) - hủy các tác vụ đang chờ xử lý?

Nếu tôi hiểu đúng, bất cứ khi nào có một mới nội IObservable<T>, phương pháp .Switch() đặt mua nó và unsubscribes từ trước đó (s), khiến chúng Dispose().
Có thể có thể bằng cách nào đó có dây để kích hoạt tác vụ hủy?

Trả lời

3

Bạn có phải làm việc với Công việc không?

Nếu bạn vui khi làm việc hoàn toàn với Đài quan sát thì bạn có thể làm điều này một cách độc đáo.

Hãy thử làm một cái gì đó như thế này:

var query = 
    Observable.Create<int>(o => 
    { 
     var cancelling = false; 
     var cancel = Disposable.Create(() => 
     { 
      cancelling = true; 
     }); 
     var subscription = Observable.Start(() => 
     { 
      for (var i = 0; i < 100; i++) 
      { 
       Thread.Sleep(10); //1000 ms in total 
       if (cancelling) 
       { 
        Console.WriteLine("Cancelled on {0}", i); 
        return -1; 
       } 
      } 
      Console.WriteLine("Done"); 
      return 42; 
     }).Subscribe(o); 
     return new CompositeDisposable(cancel, subscription); 
    }); 

quan sát này được thực hiện một số công việc khó khăn trong vòng lặp for với Thread.Sleep(10);, nhưng khi quan sát được xử lý vòng lặp được thoát và công việc CPU thâm chấm dứt. Sau đó, bạn có thể sử dụng tiêu chuẩn Rx Dispose với số Switch để hủy công việc đang tiến hành.

Nếu bạn muốn rằng gói lên trong một phương pháp, sau đó thử này:

public static IObservable<T> Start<T>(Func<Func<bool>, T> work) 
{ 
    return Observable.Create<T>(o => 
    { 
     var cancelling = false; 
     var cancel = Disposable 
      .Create(() => cancelling = true); 
     var subscription = Observable 
      .Start(() => work(() => cancelling)) 
      .Subscribe(o); 
     return new CompositeDisposable(cancel, subscription); 
    }); 
} 

Và sau đó gọi nó với một chức năng như thế này:

Func<Func<bool>, int> work = cancelling => 
{ 
    for (var i = 0; i < 100; i++) 
    { 
     Thread.Sleep(10); //1000 ms in total 
     if (cancelling()) 
     { 
      Console.WriteLine("Cancelled on {0}", i); 
      return -1; 
     } 
    } 
    Console.WriteLine("Done"); 
    return 42; 
}; 

Dưới đây là mã của tôi là chứng minh điều này đã làm việc:

var disposable = 
    ObservableEx 
     .Start(work) 
     .Subscribe(x => Console.WriteLine(x)); 

Thread.Sleep(500); 
disposable.Dispose(); 

Tôi nhận "Đã hủy trên 50" (đôi khi "Bị hủy trên 51") làm đầu ra của mình.

+0

Không, tôi không có yêu cầu sử dụng Tác vụ. Nó chỉ cảm thấy tự nhiên để đóng gói hành động xử lý chuyên sâu trong một. Tôi sẽ xem xét giải pháp của bạn :) –

+0

@CristiDiaconescu - Thành thật mà nói tôi có thể thấy những gì TPL đã làm cho nó, nhưng tôi đã luôn luôn thấy rằng các giải pháp của tôi bằng cách sử dụng Rx luôn luôn nhiều neater và nhiều hơn nữa thể hiện. Tôi cố gắng tránh TPL ủng hộ Rx. – Enigmativity

+0

Nếu bạn đang sử dụng Rx, phương thức 'Start' được đề xuất của bạn có thể dễ hiểu hơn như' Bắt đầu (Func work) 'và sử dụng' CancellationDisposable' để tạo mã thông báo thay vì dùng một lần tùy chỉnh. Hoặc, nếu bạn * thực sự * ôm Rx: 'Bắt đầu (Func , T> công việc)' nơi phương thức 'Bắt đầu' của bạn cho nó một' AsyncSubject' mà nó báo hiệu hủy khi đăng ký được xử lý. 'Func ' chỉ cảm thấy khó khăn và không theo tinh thần của Rx. – Brandon

12

Bạn chỉ có thể sử dụng Observable.FromAsync mà sẽ tạo ra thẻ bị hủy khi unsubcribes quan sát:

input.Throttle(...) 
    .Select(_ => Observable.FromAsync(token => CreateMyTask(..., token))) 
    .Switch() 
    .Subscribe(...); 

này sẽ tạo ra một thẻ mới cho mỗi đơn vị công việc và hủy bỏ nó mỗi lần Switch chuyển sang một mới .

+1

Cảm ơn! Tôi không biết về phương thức Factory này. Nó có vẻ thích hợp hơn với toán tử 'ToObservable()' do sự hỗ trợ hủy bỏ của nó. –

+0

vâng tôi mới phát hiện ra nó cách đây vài tuần. Lệnh cấm của Rx là thiếu tài liệu mạnh mẽ. – Brandon

+0

@ ebook của Brandon Lee * là * tài liệu hướng dẫn :) –

Các vấn đề liên quan