2013-09-25 37 views
12

Có mẫu được thiết lập đề xuất nào để tự hủy và khởi động lại tác vụ không?Mẫu để tự hủy và khởi động lại tác vụ

Ví dụ: tôi đang làm việc trên API cho trình kiểm tra chính tả nền. Phiên kiểm tra chính tả được bao bọc dưới dạng Task. Mỗi phiên mới phải hủy bỏ phiên bản trước đó và chờ kết thúc (để sử dụng lại đúng tài nguyên như nhà cung cấp dịch vụ kiểm tra chính tả, v.v.).

tôi đã đi lên với một cái gì đó như thế này:

class Spellchecker 
{ 
    Task pendingTask = null; // pending session 
    CancellationTokenSource cts = null; // CTS for pending session 

    // SpellcheckAsync is called by the client app 
    public async Task<bool> SpellcheckAsync(CancellationToken token) 
    { 
     // SpellcheckAsync can be re-entered 
     var previousCts = this.cts; 
     var newCts = CancellationTokenSource.CreateLinkedTokenSource(token); 
     this.cts = newCts; 

     if (IsPendingSession()) 
     { 
      // cancel the previous session and wait for its termination 
      if (!previousCts.IsCancellationRequested) 
       previousCts.Cancel(); 
      // this is not expected to throw 
      // as the task is wrapped with ContinueWith 
      await this.pendingTask; 
     } 

     newCts.Token.ThrowIfCancellationRequested(); 
     var newTask = SpellcheckAsyncHelper(newCts.Token); 

     this.pendingTask = newTask.ContinueWith((t) => { 
      this.pendingTask = null; 
      // we don't need to know the result here, just log the status 
      Debug.Print(((object)t.Exception ?? (object)t.Status).ToString()); 
     }, TaskContinuationOptions.ExecuteSynchronously); 

     return await newTask; 
    } 

    // the actual task logic 
    async Task<bool> SpellcheckAsyncHelper(CancellationToken token) 
    { 
     // do not start a new session if the the previous one still pending 
     if (IsPendingSession()) 
      throw new ApplicationException("Cancel the previous session first."); 

     // do the work (pretty much IO-bound) 
     try 
     { 
      bool doMore = true; 
      while (doMore) 
      { 
       token.ThrowIfCancellationRequested(); 
       await Task.Delay(500); // placeholder to call the provider 
      } 
      return doMore; 
     } 
     finally 
     { 
      // clean-up the resources 
     } 
    } 

    public bool IsPendingSession() 
    { 
     return this.pendingTask != null && 
      !this.pendingTask.IsCompleted && 
      !this.pendingTask.IsCanceled && 
      !this.pendingTask.IsFaulted; 
    } 
} 

Ứng dụng client (UI) chỉ nên có thể gọi SpellcheckAsync nhiều lần như mong muốn, mà không lo lắng về việc hủy một phiên chưa giải quyết. Vòng lặp chính doMore chạy trên luồng giao diện người dùng (vì nó liên quan đến giao diện người dùng, trong khi tất cả các cuộc gọi nhà cung cấp dịch vụ kiểm tra chính tả là IO-bound).

Tôi cảm thấy hơi khó chịu về việc tôi phải chia API thành hai peices, SpellcheckAsyncSpellcheckAsyncHelper, nhưng tôi không thể nghĩ ra cách tốt hơn để làm điều này và chưa được thử nghiệm.

Trả lời

15

Tôi nghĩ khái niệm chung là khá tốt, mặc dù tôi khuyên bạn không nên sử dụng ContinueWith.

tôi chỉ muốn viết nó sử dụng thường xuyên await, và rất nhiều logic "Tôi đã chạy" là không cần thiết:

Task pendingTask = null; // pending session 
CancellationTokenSource cts = null; // CTS for pending session 

// SpellcheckAsync is called by the client app on the UI thread 
public async Task<bool> SpellcheckAsync(CancellationToken token) 
{ 
    // SpellcheckAsync can be re-entered 
    var previousCts = this.cts; 
    var newCts = CancellationTokenSource.CreateLinkedTokenSource(token); 
    this.cts = newCts; 

    if (previousCts != null) 
    { 
     // cancel the previous session and wait for its termination 
     previousCts.Cancel(); 
     try { await this.pendingTask; } catch { } 
    } 

    newCts.Token.ThrowIfCancellationRequested(); 
    this.pendingTask = SpellcheckAsyncHelper(newCts.Token); 
    return await this.pendingTask; 
} 

// the actual task logic 
async Task<bool> SpellcheckAsyncHelper(CancellationToken token) 
{ 
    // do the work (pretty much IO-bound) 
    using (...) 
    { 
     bool doMore = true; 
     while (doMore) 
     { 
      token.ThrowIfCancellationRequested(); 
      await Task.Delay(500); // placeholder to call the provider 
     } 
     return doMore; 
    } 
} 
+0

@Stephen Cleary, tôi rất tôn trọng công việc của bạn trên mọi thứ không đồng bộ, vì vậy xin đừng hiểu nhầm điều này: Tôi chỉ tò mò thôi. Tôi hơi ngạc nhiên khi bạn không viết lại phần 'await this.pendingTask' bằng cách sử dụng 'SemaphoreSlim' hoặc của riêng bạn' AsyncLock' hoặc tương tự. Bạn có thường tin rằng việc cải thiện an toàn luồng trong các phần "đồng bộ" của các phương pháp không đồng bộ là một tối ưu hóa sớm không? –

+0

@KirillShlenskiy: Không có gì sai khi sử dụng 'SemaphoreSlim' hoặc tương tự với giới hạn một lần. –

5

Dưới đây là phiên bản mới nhất của mẫu hủy-and-khởi động lại mà tôi sử dụng:

class AsyncWorker 
{ 
    Task _pendingTask; 
    CancellationTokenSource _pendingTaskCts; 

    // the actual worker task 
    async Task DoWorkAsync(CancellationToken token) 
    { 
     token.ThrowIfCancellationRequested(); 
     Debug.WriteLine("Start."); 
     await Task.Delay(100, token); 
     Debug.WriteLine("Done."); 
    } 

    // start/restart 
    public void Start(CancellationToken token) 
    { 
     var previousTask = _pendingTask; 
     var previousTaskCts = _pendingTaskCts; 

     var thisTaskCts = CancellationTokenSource.CreateLinkedTokenSource(token); 

     _pendingTask = null; 
     _pendingTaskCts = thisTaskCts; 

     // cancel the previous task 
     if (previousTask != null && !previousTask.IsCompleted) 
      previousTaskCts.Cancel(); 

     Func<Task> runAsync = async() => 
     { 
      // await the previous task (cancellation requested) 
      if (previousTask != null) 
       await previousTask.WaitObservingCancellationAsync(); 

      // if there's a newer task started with Start, this one should be cancelled 
      thisTaskCts.Token.ThrowIfCancellationRequested(); 

      await DoWorkAsync(thisTaskCts.Token).WaitObservingCancellationAsync(); 
     }; 

     _pendingTask = Task.Factory.StartNew(
      runAsync, 
      CancellationToken.None, 
      TaskCreationOptions.None, 
      TaskScheduler.FromCurrentSynchronizationContext()).Unwrap(); 
    } 

    // stop 
    public void Stop() 
    { 
     if (_pendingTask == null) 
      return; 

     if (_pendingTask.IsCanceled) 
      return; 

     if (_pendingTask.IsFaulted) 
      _pendingTask.Wait(); // instantly throw an exception 

     if (!_pendingTask.IsCompleted) 
     { 
      // still running, request cancellation 
      if (!_pendingTaskCts.IsCancellationRequested) 
       _pendingTaskCts.Cancel(); 

      // wait for completion 
      if (System.Threading.Thread.CurrentThread.GetApartmentState() == ApartmentState.MTA) 
      { 
       // MTA, blocking wait 
       _pendingTask.WaitObservingCancellation(); 
      } 
      else 
      { 
       // TODO: STA, async to sync wait bridge with DoEvents, 
       // similarly to Thread.Join 
      } 
     } 
    } 
} 

// useful extensions 
public static class Extras 
{ 
    // check if exception is OperationCanceledException 
    public static bool IsOperationCanceledException(this Exception ex) 
    { 
     if (ex is OperationCanceledException) 
      return true; 

     var aggEx = ex as AggregateException; 
     return aggEx != null && aggEx.InnerException is OperationCanceledException; 
    } 

    // wait asynchrnously for the task to complete and observe exceptions 
    public static async Task WaitObservingCancellationAsync(this Task task) 
    { 
     try 
     { 
      await task; 
     } 
     catch (Exception ex) 
     { 
      // rethrow if anything but OperationCanceledException 
      if (!ex.IsOperationCanceledException()) 
       throw; 
     } 
    } 

    // wait for the task to complete and observe exceptions 
    public static void WaitObservingCancellation(this Task task) 
    { 
     try 
     { 
      task.Wait(); 
     } 
     catch (Exception ex) 
     { 
      // rethrow if anything but OperationCanceledException 
      if (!ex.IsOperationCanceledException()) 
       throw; 
     } 
    } 
} 

thử nghiệm sử dụng (sản xuất duy nhất một "Start/Done" đầu ra cho DoWorkAsync):

private void MainForm_Load(object sender, EventArgs e) 
{ 
    var worker = new AsyncWorker(); 
    for (var i = 0; i < 10; i++) 
     worker.Start(CancellationToken.None); 
} 
+0

Phiên bản mới hơn và chức năng của mẫu này có tại đây: http://stackoverflow.com/a/21427264/1768303 – Noseratio

0

Hy vọng điều này sẽ hữu ích - đã cố gắng để tạo ra lớp Helper có thể được tái sử dụng:

class SelfCancelRestartTask 
{ 
    private Task _task = null; 
    public CancellationTokenSource TokenSource { get; set; } = null; 

    public SelfCancelRestartTask() 
    { 
    } 

    public async Task Run(Action operation) 
    { 
     if (this._task != null && 
      !this._task.IsCanceled && 
      !this._task.IsCompleted && 
      !this._task.IsFaulted) 
     { 
      TokenSource?.Cancel(); 
      await this._task; 
      TokenSource = new CancellationTokenSource(); 
     } 
     else 
     { 
      TokenSource = new CancellationTokenSource(); 
     } 
     this._task = Task.Run(operation, TokenSource.Token); 
    } 
0

Các ví dụ dường như trên để có vấn đề khi các phương pháp không đồng bộ được gọi là nhiều lần một cách nhanh chóng sau mỗi khác, ví dụ Bốn lần. Sau đó, tất cả các cuộc gọi tiếp theo của phương thức này hủy nhiệm vụ đầu tiên và cuối cùng ba nhiệm vụ mới được tạo ra chạy cùng một lúc. Vì vậy, tôi đã đưa ra điều này:

private List<Tuple<Task, CancellationTokenSource>> _parameterExtractionTasks = new List<Tuple<Task, CancellationTokenSource>>(); 

    /// <remarks>This method is asynchronous, i.e. it runs partly in the background. As this method might be called multiple times 
    /// quickly after each other, a mechanism has been implemented that <b>all</b> tasks from previous method calls are first canceled before the task is started anew.</remarks> 
    public async void ParameterExtraction() { 

     CancellationTokenSource newCancellationTokenSource = new CancellationTokenSource(); 

     // Define the task which shall run in the background. 
     Task newTask = new Task(() => { 
      // do some work here 
       } 
      } 
     }, newCancellationTokenSource.Token); 

     _parameterExtractionTasks.Add(new Tuple<Task, CancellationTokenSource>(newTask, newCancellationTokenSource)); 

     /* Convert the list to arrays as an exception is thrown if the number of entries in a list changes while 
     * we are in a for loop. This can happen if this method is called again while we are waiting for a task. */ 
     Task[] taskArray = _parameterExtractionTasks.ConvertAll(item => item.Item1).ToArray(); 
     CancellationTokenSource[] tokenSourceArray = _parameterExtractionTasks.ConvertAll(item => item.Item2).ToArray(); 

     for (int i = 0; i < taskArray.Length - 1; i++) { // -1: the last task, i.e. the most recent task, shall be run and not canceled. 
      // Cancel all running tasks which were started by previous calls of this method 
      if (taskArray[i].Status == TaskStatus.Running) { 
       tokenSourceArray[i].Cancel(); 
       await taskArray[i]; // wait till the canceling completed 
      } 
     } 

     // Get the most recent task 
     Task currentThreadToRun = taskArray[taskArray.Length - 1]; 

     // Start this task if, but only if it has not been started before (i.e. if it is still in Created state). 
     if (currentThreadToRun.Status == TaskStatus.Created) { 
      currentThreadToRun.Start(); 
      await currentThreadToRun; // wait till this task is completed. 
     } 

     // Now the task has been completed once. Thus we can recent the list of tasks to cancel or maybe run. 
     _parameterExtractionTasks = new List<Tuple<Task, CancellationTokenSource>>(); 
    } 
Các vấn đề liên quan