2012-11-14 48 views
5

Tôi đang gặp khó khăn trong việc tìm một công cụ lập lịch tác vụ mà tôi có thể lên lịch các tác vụ ưu tiên. Nó giống như những gì Task.Run cố gắng giải quyết, nhưng bạn không thể chỉ định một công cụ lên lịch cho Task.Run. Tôi đã sử dụng một số QueuedTaskScheduler từ số Parallel Extensions Extras Samples để giải quyết yêu cầu mức độ ưu tiên của nhiệm vụ (cũng được đề xuất bởi điều này post).Trình lập lịch biểu tác vụ cấp đồng thời có giới hạn (với nhiệm vụ ưu tiên) xử lý các tác vụ được gói

Dưới đây là ví dụ của tôi:

class Program 
{ 
    private static QueuedTaskScheduler queueScheduler = new QueuedTaskScheduler(targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1); 
    private static TaskScheduler ts_priority1; 
    private static TaskScheduler ts_priority2; 
    static void Main(string[] args) 
    { 
     ts_priority1 = queueScheduler.ActivateNewQueue(1); 
     ts_priority2 = queueScheduler.ActivateNewQueue(2); 

     QueueValue(1, ts_priority2); 
     QueueValue(2, ts_priority2); 
     QueueValue(3, ts_priority2); 
     QueueValue(4, ts_priority1); 
     QueueValue(5, ts_priority1); 
     QueueValue(6, ts_priority1); 

     Console.ReadLine();   
    } 

    private static Task QueueTask(Func<Task> f, TaskScheduler ts) 
    { 
     return Task.Factory.StartNew(f, CancellationToken.None, TaskCreationOptions.HideScheduler | TaskCreationOptions.DenyChildAttach, ts); 
    } 

    private static Task QueueValue(int i, TaskScheduler ts) 
    { 
     return QueueTask(async() => 
     { 
      Console.WriteLine("Start {0}", i); 
      await Task.Delay(1000); 
      Console.WriteLine("End {0}", i); 
     }, ts); 
    } 
} 

Sản lượng đặc trưng của ví dụ trên là:

Start 4 
Start 5 
Start 6 
Start 1 
Start 2 
Start 3 
End 4 
End 3 
End 5 
End 2 
End 1 
End 6 

Những gì tôi muốn là:

Start 4 
End 4 
Start 5 
End 5 
Start 6 
End 6 
Start 1 
End 1 
Start 2 
End 2 
Start 3 
End 3 

EDIT:

Tôi nghĩ rằng tôi đang tìm kiếm một công cụ lên lịch, tương tự như QueuedTaskScheduler, điều đó sẽ giải quyết vấn đề này. Nhưng bất kỳ đề xuất nào khác đều được hoan nghênh.

+0

Vâng, những gì bạn muốn là xử lý mức độ ưu tiên của tác vụ, nhưng không chạy chúng ở chế độ song song? bạn có thể không chỉ giới hạn số lượng các chuỗi đồng thời trong bộ lập lịch biểu của bạn không? – Kek

+0

@Kek 'new QueuedTaskScheduler (targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1);' ở trên thực hiện chính xác (giới hạn số lượng các chuỗi đồng thời tới 1) –

Trả lời

2

Giải pháp tốt nhất mà tôi có thể tìm thấy là làm cho phiên bản riêng của tôi về QueuedTaskScheduler (ban đầu được tìm thấy trong mã Parallel Extensions Extras Samples nguồn).

Tôi đã thêm tham số bool awaitWrappedTasks vào các hàm tạo của QueuedTaskScheduler.

public QueuedTaskScheduler(
     TaskScheduler targetScheduler, 
     int maxConcurrencyLevel, 
     bool awaitWrappedTasks = false) 
{ 
    ... 
    _awaitWrappedTasks = awaitWrappedTasks; 
    ... 
} 

public QueuedTaskScheduler(
     int threadCount, 
     string threadName = "", 
     bool useForegroundThreads = false, 
     ThreadPriority threadPriority = ThreadPriority.Normal, 
     ApartmentState threadApartmentState = ApartmentState.MTA, 
     int threadMaxStackSize = 0, 
     Action threadInit = null, 
     Action threadFinally = null, 
     bool awaitWrappedTasks = false) 
{ 
    ... 
    _awaitWrappedTasks = awaitWrappedTasks; 

    // code starting threads (removed here in example) 
    ... 
} 

sau đó tôi sửa đổi phương pháp ProcessPrioritizedAndBatchedTasks()async

private async void ProcessPrioritizedAndBatchedTasks() 

sau đó tôi sửa đổi mã ngay sau phần nơi scheduled task được thực hiện:

private async void ProcessPrioritizedAndBatchedTasks() 
{ 
    bool continueProcessing = true; 
    while (!_disposeCancellation.IsCancellationRequested && continueProcessing) 
    { 
     try 
     { 
      // Note that we're processing tasks on this thread 
      _taskProcessingThread.Value = true; 

      // Until there are no more tasks to process 
      while (!_disposeCancellation.IsCancellationRequested) 
      { 
       // Try to get the next task. If there aren't any more, we're done. 
       Task targetTask; 
       lock (_nonthreadsafeTaskQueue) 
       { 
        if (_nonthreadsafeTaskQueue.Count == 0) break; 
        targetTask = _nonthreadsafeTaskQueue.Dequeue(); 
       } 

       // If the task is null, it's a placeholder for a task in the round-robin queues. 
       // Find the next one that should be processed. 
       QueuedTaskSchedulerQueue queueForTargetTask = null; 
       if (targetTask == null) 
       { 
        lock (_queueGroups) FindNextTask_NeedsLock(out targetTask, out queueForTargetTask); 
       } 

       // Now if we finally have a task, run it. If the task 
       // was associated with one of the round-robin schedulers, we need to use it 
       // as a thunk to execute its task. 
       if (targetTask != null) 
       { 
        if (queueForTargetTask != null) queueForTargetTask.ExecuteTask(targetTask); 
        else TryExecuteTask(targetTask); 

        // ***** MODIFIED CODE START **** 
        if (_awaitWrappedTasks) 
        { 
         var targetTaskType = targetTask.GetType(); 
         if (targetTaskType.IsConstructedGenericType && typeof(Task).IsAssignableFrom(targetTaskType.GetGenericArguments()[0])) 
         { 
          dynamic targetTaskDynamic = targetTask; 
          // Here we await the completion of the proxy task. 
          // We do not await the proxy task directly, because that would result in that await will throw the exception of the wrapped task (if one existed) 
          // In the continuation we then simply return the value of the exception object so that the exception (stored in the proxy task) does not go totally unobserved (that could cause the process to crash) 
          await TaskExtensions.Unwrap(targetTaskDynamic).ContinueWith((Func<Task, Exception>)(t => t.Exception), TaskContinuationOptions.ExecuteSynchronously); 
         } 
        } 
        // ***** MODIFIED CODE END **** 
       } 
      } 
     } 
     finally 
     { 
      // Now that we think we're done, verify that there really is 
      // no more work to do. If there's not, highlight 
      // that we're now less parallel than we were a moment ago. 
      lock (_nonthreadsafeTaskQueue) 
      { 
       if (_nonthreadsafeTaskQueue.Count == 0) 
       { 
        _delegatesQueuedOrRunning--; 
        continueProcessing = false; 
        _taskProcessingThread.Value = false; 
       } 
      } 
     } 
    } 
} 

Việc thay đổi phương pháp ThreadBasedDispatchLoop có một chút khác biệt, trong đó chúng tôi không thể sử dụng từ khóa async hoặc người nào khác, chúng tôi sẽ phá vỡ chức năng cũ tạo ra các nhiệm vụ theo lịch trình trong các chủ đề chuyên dụng. Vì vậy, đây là phiên bản sửa đổi của ThreadBasedDispatchLoop

private void ThreadBasedDispatchLoop(Action threadInit, Action threadFinally) 
{ 
    _taskProcessingThread.Value = true; 
    if (threadInit != null) threadInit(); 
    try 
    { 
     // If the scheduler is disposed, the cancellation token will be set and 
     // we'll receive an OperationCanceledException. That OCE should not crash the process. 
     try 
     { 
      // If a thread abort occurs, we'll try to reset it and continue running. 
      while (true) 
      { 
       try 
       { 
        // For each task queued to the scheduler, try to execute it. 
        foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)) 
        { 
         Task targetTask = task; 
         // If the task is not null, that means it was queued to this scheduler directly. 
         // Run it. 
         if (targetTask != null) 
         { 
          TryExecuteTask(targetTask); 
         } 
         // If the task is null, that means it's just a placeholder for a task 
         // queued to one of the subschedulers. Find the next task based on 
         // priority and fairness and run it. 
         else 
         { 
          // Find the next task based on our ordering rules...          
          QueuedTaskSchedulerQueue queueForTargetTask; 
          lock (_queueGroups) FindNextTask_NeedsLock(out targetTask, out queueForTargetTask); 

          // ... and if we found one, run it 
          if (targetTask != null) queueForTargetTask.ExecuteTask(targetTask); 
         } 

         if (_awaitWrappedTasks) 
         { 
          var targetTaskType = targetTask.GetType(); 
          if (targetTaskType.IsConstructedGenericType && typeof(Task).IsAssignableFrom(targetTaskType.GetGenericArguments()[0])) 
          { 
           dynamic targetTaskDynamic = targetTask; 
           // Here we wait for the completion of the proxy task. 
           // We do not wait for the proxy task directly, because that would result in that Wait() will throw the exception of the wrapped task (if one existed) 
           // In the continuation we then simply return the value of the exception object so that the exception (stored in the proxy task) does not go totally unobserved (that could cause the process to crash) 
           TaskExtensions.Unwrap(targetTaskDynamic).ContinueWith((Func<Task, Exception>)(t => t.Exception), TaskContinuationOptions.ExecuteSynchronously).Wait(); 
          } 
         } 
        } 
       } 
       catch (ThreadAbortException) 
       { 
        // If we received a thread abort, and that thread abort was due to shutting down 
        // or unloading, let it pass through. Otherwise, reset the abort so we can 
        // continue processing work items. 
        if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload()) 
        { 
         Thread.ResetAbort(); 
        } 
       } 
      } 
     } 
     catch (OperationCanceledException) { } 
    } 
    finally 
    { 
     // Run a cleanup routine if there was one 
     if (threadFinally != null) threadFinally(); 
     _taskProcessingThread.Value = false; 
    } 
} 

Tôi đã thử nghiệm này và nó mang lại cho các đầu ra mong muốn. Kỹ thuật này cũng có thể được sử dụng cho bất kỳ trình lên lịch nào khác. Ví dụ. LimitedConcurrencyLevelTaskSchedulerOrderedTaskScheduler

+0

Đang đợi tác vụ trong bộ lập lịch sẽ hủy giá trị của IO không đồng bộ. Nếu bạn không cần IO không đồng bộ thì bạn có thể chuyển sang các cơ quan nhiệm vụ đồng bộ. – usr

+0

+1 rồi. Tôi đã học được rất nhiều trong câu hỏi này. Không hoàn toàn thuyết phục rằng giải pháp này là thích hợp hơn với một 'AsyncSemaphore' nhưng tôi sẽ suy nghĩ về nó. – usr

+0

Bạn đang thực hiện một phương thức 'async-void' từ bên trong thực thi' TaskScheduler'? Đáng sợ, tôi tự hỏi rằng @StephenCleary không có gì để nói về điều này. – springy76

0

Tôi nghĩ rằng không thể đạt được mục tiêu này. Một vấn đề cốt lõi có vẻ là một TaskScheduler chỉ có thể được sử dụng để chạy mã. Nhưng có những nhiệm vụ không chạy mã, chẳng hạn như nhiệm vụ IO hoặc nhiệm vụ hẹn giờ. Tôi không nghĩ rằng cơ sở hạ tầng TaskScheduler có thể được sử dụng để lên lịch cho những cơ sở hạ tầng đó.

Từ góc nhìn của một TaskScheduler nó trông như thế này:

1. Select a registered task for execution 
2. Execute its code on the CPU 
3. Repeat 

Bước (2) là đồng bộ có nghĩa là Task để được thực thi phải bắt đầu và kết thúc như một phần của bước (2). Điều này có nghĩa rằng điều này Task không thể làm async IO bởi vì đó sẽ là không chặn. Theo nghĩa đó, TaskScheduler chỉ hỗ trợ mã chặn.

Tôi nghĩ rằng bạn sẽ được phục vụ tốt nhất bằng cách tự mình triển khai phiên bản AsyncSemaphore phát hành bồi bàn theo thứ tự ưu tiên và điều chỉnh. Các phương thức async của bạn có thể chờ đợi semaphore đó theo cách không bị chặn. Tất cả công việc CPU có thể chạy trên nhóm mặc định, do đó không cần phải bắt đầu các chủ đề tùy chỉnh bên trong một tuỳ chỉnh TaskScheduler. Các tác vụ IO có thể tiếp tục sử dụng IO không bị chặn.

+0

những gì bạn đã giải thích ở đây tôi đã thử và về cơ bản nó có cùng đầu ra (như trong vấn đề ban đầu). Trong đề xuất của bạn, 'firstPartTask' được lên lịch trên task scheduler xếp hàng, nhưng hoàn thành ngay sau khi nó truy cập' await' đầu tiên và scheduler đơn giản thực hiện "phần đầu tiên" tiếp theo trong hàng đợi ngay cả khi "task bên trong" trước đó (việc đặt lại tác vụ sau lần đầu tiên 'đang chờ') chưa hoàn thành. Tôi chỉ có thể nghĩ rằng điều này sẽ được giải quyết bởi một ** lịch trình ** xử lý kịch bản này tôi đang tìm kiếm và không thể được giải quyết bằng một số phép thuật bên ngoài trình lên lịch. –

+0

Tôi tin rằng bạn đúng. Tôi đã thêm một số suy nghĩ và gợi ý. Xin vui lòng cho tôi biết những gì bạn nghĩ. – usr

+0

Cảm ơn bạn đã cập nhật. Đề xuất của bạn bằng cách sử dụng khóa semaphore chính xác là những gì người dùng đề xuất trong [answer] sau (http://stackoverflow.com/a/13379980/1514235) (xem nhận xét của tôi). Đề xuất của bạn rằng một trình lập lịch biểu chỉ thực thi các tác vụ của nó một cách đồng bộ là có phần đúng, nhưng điều gì sẽ xảy ra nếu trình lập lịch đang chờ nhiệm vụ "được bao bọc" của mỗi tác vụ trước khi thực hiện bất kỳ tác vụ nào khác trong hàng đợi. Tôi nghĩ rằng điều này đã cho tôi một ý tưởng ... cảm ơn (sẽ cho bạn biết nếu tôi đến với một cái gì đó). –

3

Thật không may, điều này không thể giải quyết được với TaskScheduler, vì chúng luôn hoạt động ở mức Task và phương thức async hầu như luôn chứa nhiều Task s.

Bạn nên sử dụng SemaphoreSlim kết hợp với bộ lập lịch ưu tiên. Ngoài ra, bạn có thể sử dụng AsyncLock (cũng được bao gồm trong số AsyncEx library) của tôi.

class Program 
{ 
    private static QueuedTaskScheduler queueScheduler = new QueuedTaskScheduler(targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1); 
    private static TaskScheduler ts_priority1; 
    private static TaskScheduler ts_priority2; 
    private static SemaphoreSlim semaphore = new SemaphoreSlim(1); 
    static void Main(string[] args) 
    { 
    ts_priority1 = queueScheduler.ActivateNewQueue(1); 
    ts_priority2 = queueScheduler.ActivateNewQueue(2); 

    QueueValue(1, ts_priority2); 
    QueueValue(2, ts_priority2); 
    QueueValue(3, ts_priority2); 
    QueueValue(4, ts_priority1); 
    QueueValue(5, ts_priority1); 
    QueueValue(6, ts_priority1); 

    Console.ReadLine();   
    } 

    private static Task QueueTask(Func<Task> f, TaskScheduler ts) 
    { 
    return Task.Factory.StartNew(f, CancellationToken.None, TaskCreationOptions.HideScheduler | TaskCreationOptions.DenyChildAttach, ts).Unwrap(); 
    } 

    private static Task QueueValue(int i, TaskScheduler ts) 
    { 
    return QueueTask(async() => 
    { 
     await semaphore.WaitAsync(); 
     try 
     { 
     Console.WriteLine("Start {0}", i); 
     await Task.Delay(1000); 
     Console.WriteLine("End {0}", i); 
     } 
     finally 
     { 
     semaphore.Release(); 
     } 
    }, ts); 
    } 
} 
+1

Điều này trông giống như một giải pháp thú vị. Tuy nhiên, tôi thấy một vấn đề với điều này. Mặc dù giải pháp sẽ (lúc đầu) cho kết quả đầu ra chính xác (như trong câu hỏi này), nhưng nó sẽ phá vỡ mức ưu tiên của các tác vụ được thực hiện. Trình lên lịch sẽ thực hiện tất cả các nhiệm vụ (trong mức độ ưu tiên chính xác) cho đến khi 'await semaphore.WaitAsync()' nhưng các tác vụ có mức ưu tiên cao hơn sẽ không được giải phóng khỏi khóa trước khi các tác vụ có mức độ ưu tiên thấp hơn. Điều này đặc biệt đúng nếu các tác vụ ưu tiên cao hơn được lên lịch sau khi các tác vụ ưu tiên thấp hơn (vẫn đang chờ để được giải phóng khỏi khóa). –

+0

Trong trường hợp đó, bạn sẽ cần một khóa dựa trên mức độ ưu tiên thực sự, không tồn tại vì AFAIK không ai khác cần đến. Bạn sẽ phải xây dựng của riêng bạn. –

+0

Tôi đã thêm [answer] của riêng mình (http://stackoverflow.com/a/13414364/1514235). Vui lòng xem và xem bạn nghĩ gì. –

Các vấn đề liên quan