2012-01-12 26 views
5

Tôi đang sử dụng các lớp Nhiệm vụ để thực hiện tính toán đa luồng.Tôi có thể giới hạn số lượng đối tượng System.Threading.Tasks.Task chạy đồng thời không?

Ví dụ:

List<Task> taskList = new List<Task>(); 
    for(int i=0;i<10;i++){ 
     var task = new Task(() => Console.WriteLine("Hello from taskA.")); 
     taskList.Add(task); 
     task.Start(); 
    } 

Có cách nào để làm mà thôi, chúng ta hãy nói, 3 nhiệm vụ để chạy nhiều nhất, và phần còn lại để chờ đợi?

+5

Có thể trùng lặp http://stackoverflow.com/questions/2898609/system-threading-tasks-limit-the-number-of-concurrent-tasks – Samich

Trả lời

3

tôi khuyên bạn nên kiểm tra bài here.

Để diễn giải, bạn tạo một tập hợp các tác vụ có liên quan, sau đó sử dụng ParallelOptions.MaxDegreeOfParallelism để kiểm soát số lượng được thực hiện cùng một lúc.

4

My blog post cho biết cách thực hiện điều này cả với Tác vụ và Tác vụ và cung cấp dự án mẫu bạn có thể tải xuống và chạy để xem cả hai đang hoạt động.

Với hành động

Nếu sử dụng Tác vụ, bạn có thể sử dụng chức năng .Net Parallel.Invoke được tích hợp sẵn. Ở đây chúng tôi giới hạn nó để chạy tối đa 3 luồng song song.

var listOfActions = new List<Action>(); 
for (int i = 0; i < 10; i++) 
{ 
    // Note that we create the Action here, but do not start it. 
    listOfActions.Add(() => DoSomething()); 
} 

var options = new ParallelOptions {MaxDegreeOfParallelism = 3}; 
Parallel.Invoke(options, listOfActions.ToArray()); 

Với Nhiệm vụ

Vì bạn đang sử dụng Công việc ở đây, mặc dù không có tích hợp chức năng. Tuy nhiên, bạn có thể sử dụng cái mà tôi cung cấp trên blog của mình.

/// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     StartAndWaitAllThrottled(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); 
    } 

    /// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     // Convert to a list of tasks so that we don&#39;t enumerate over it multiple times needlessly. 
     var tasks = tasksToRun.ToList(); 

     using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) 
     { 
      var postTaskTasks = new List<Task>(); 

      // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. 
      tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); 

      // Start running each task. 
      foreach (var task in tasks) 
      { 
       // Increment the number of tasks currently running and wait if too many are running. 
       throttler.Wait(timeoutInMilliseconds, cancellationToken); 

       cancellationToken.ThrowIfCancellationRequested(); 
       task.Start(); 
      } 

      // Wait for all of the provided tasks to complete. 
      // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler&#39;s using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. 
      Task.WaitAll(postTaskTasks.ToArray(), cancellationToken); 
     } 
    } 

Và sau đó tạo ra danh sách các nhiệm vụ và gọi hàm có họ chạy, với nói tối đa là 3 đồng thời cùng một lúc, bạn có thể làm điều này:

var listOfTasks = new List<Task>(); 
for (int i = 0; i < 10; i++) 
{ 
    var count = i; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(() => Something())); 
} 
Tasks.StartAndWaitAllThrottled(listOfTasks, 3); 
-1

Bạn có thể sử dụng này một cái chung là

public TaskThrottle(int initialCount, int maxTasksToRunInParallel) 
{ 
    _semaphore = new SemaphoreSlim(initialCount, maxTasksToRunInParallel); 
} 

public void TaskThrottler<T>(IEnumerable<Task<T>> tasks, int timeoutInMilliseconds, CancellationToken cancellationToken = default(CancellationToken)) where T : class 
{ 
     // Get Tasks as List 
     var taskList = tasks as IList<Task<T>> ?? tasks.ToList(); 
     var semaphoreTasks = new List<Task<int>>(); 

     // When the first task completed, flag as done/release 
     taskList.ForEach(x => 
     { 
      semaphoreTasks.Add(x.ContinueWith(y => _semaphore.Release(), cancellationToken)); 
     }); 

     semaphoreTasks.ForEach(async x => 
     { 
      // It will not pass this until one free slot available or timeout occure 
      if(timeoutInMilliseconds > 0) 
       await _semaphore.WaitAsync(timeoutInMilliseconds, cancellationToken); 
      else 
       await _semaphore.WaitAsync(cancellationToken); 

      // Throws a OperationCanceledException if this token has had cancellation requested 
      cancellationToken.ThrowIfCancellationRequested(); 

      // Start the task 
      x.Start(); 
     }); 

     Task.WaitAll(semaphoreTasks.ToArray(), cancellationToken); 
} 
Các vấn đề liên quan