2012-01-13 41 views
21

Tôi đang viết chương trình C# để tạo và tải lên một nửa triệu tệp qua FTP. Tôi muốn xử lý 4 tập tin song song vì máy có 4 lõi và quá trình tạo tập tin mất nhiều thời gian hơn. Có thể chuyển đổi ví dụ Powershell sau thành C#? Hoặc là có bất kỳ khuôn khổ tốt hơn như khung Actor trong C# (như F # MailboxProcessor)?Giới hạn số chuỗi song song trong C#

Powershell example

$maxConcurrentJobs = 3; 

# Read the input and queue it up 
$jobInput = get-content .\input.txt 
$queue = [System.Collections.Queue]::Synchronized((New-Object System.Collections.Queue)) 
foreach($item in $jobInput) 
{ 
    $queue.Enqueue($item) 
} 

# Function that pops input off the queue and starts a job with it 
function RunJobFromQueue 
{ 
    if($queue.Count -gt 0) 
    { 
     $j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue() 
     Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null 
    } 
} 

# Start up to the max number of concurrent jobs 
# Each job will take care of running the rest 
for($i = 0; $i -lt $maxConcurrentJobs; $i++) 
{ 
    RunJobFromQueue 
} 

Cập nhật:
Kết nối đến máy chủ FTP từ xa có thể được làm chậm vì vậy tôi muốn giới hạn việc xử lý FTP tải lên.

+0

Nếu bạn muốn giới hạn số lượng tác vụ song song, tại sao không sử dụng TPL? –

+1

Nhóm chủ đề phải đủ thông minh để xử lý việc này cho bạn. Tại sao cố gắng tự mình quản lý nó? –

+3

Bạn có thể sử dụng [PLINQ] (http://msdn.microsoft.com/en-us/library/dd460688.aspx) và đặt [WithDegreeOfParallelism] (http://msdn.microsoft.com/en-us/library/ dd383719.aspx) cho phù hợp. –

Trả lời

5

Nếu bạn đang sử dụng Net 4.0 bạn có thể sử dụng Parallel library

Giả sử bạn đang iterating throug nửa triệu các tập tin bạn có thể "song song" lặp sử dụng một Parallel Foreach for instance hoặc bạn có thể have a look to PLinq Dưới đây là một comparison between the two

+0

Hãy biện minh cho -1. –

+0

Câu hỏi này được gắn thẻ với C# -4.0, rõ ràng anh ta rất giống với các phần mở rộng và sử dụng .NET 4. Một câu duy nhất KHÔNG trả lời câu hỏi của anh ta. –

+0

Rõ ràng anh ta đang sử dụng C# 4.0 nhưng nó không rõ ràng anh ta quen thuộc với thư viện song song, nếu không anh ta sẽ không hỏi một câu hỏi. Ngoài ra, câu trả lời của tôi chứa nhiều hay ít thông tin giống nhau của người khác. Xin vui lòng biện minh cho -1 xin vui lòng. –

16

Thư viện song song công việc là bạn của bạn ở đây. Xem liên kết this mô tả những gì có sẵn cho bạn. Về cơ bản khuôn khổ 4 đi kèm với nó mà tối ưu hóa các chủ đề cơ bản nền threaded gộp với số lượng bộ vi xử lý trên máy chạy.

Có lẽ cái gì đó dọc theo dòng:

ParallelOptions options = new ParallelOptions(); 

options.MaxDegreeOfParallelism = 4; 

Sau đó, trong một cái gì đó vòng lặp của bạn như:

Parallel.Invoke(options, 
() => new WebClient().Upload("http://www.linqpad.net", "lp.html"), 
() => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html")); 
2

Về cơ bản bạn sẽ muốn tạo một hành động hoặc công tác cho mỗi tập tin tải lên , đặt chúng vào một Danh sách, và sau đó xử lý danh sách đó, giới hạn số có thể được xử lý song song.

My blog post cho biết cách thực hiện việc này cả với Tác vụ và Tác vụ và cung cấp dự án mẫu bạn có thể tải xuống và chạy để xem cả hai đang hoạt động.

Với hành động

Nếu sử dụng Tác vụ, bạn có thể sử dụng chức năng .Net Parallel.Invoke được tích hợp sẵn. Ở đây chúng tôi giới hạn nó để chạy tối đa 4 luồng song song.

var listOfActions = new List<Action>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(() => UploadFile(localFile))); 
} 

var options = new ParallelOptions {MaxDegreeOfParallelism = 4}; 
Parallel.Invoke(options, listOfActions.ToArray()); 

Tùy chọn này không hỗ trợ chức năng Tải lên tệp, vì vậy bạn có thể muốn sử dụng Ví dụ tác vụ bên dưới.

Với công việc

Với công việc không có chức năng tích hợp sẵn. Tuy nhiên, bạn có thể sử dụng cái mà tôi cung cấp trên blog của mình.

/// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); 
    } 

    /// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. 
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para> 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. 
     var tasks = tasksToRun.ToList(); 

     using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) 
     { 
      var postTaskTasks = new List<Task>(); 

      // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. 
      tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); 

      // Start running each task. 
      foreach (var task in tasks) 
      { 
       // Increment the number of tasks currently running and wait if too many are running. 
       await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); 

       cancellationToken.ThrowIfCancellationRequested(); 
       task.Start(); 
      } 

      // Wait for all of the provided tasks to complete. 
      // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. 
      await Task.WhenAll(postTaskTasks.ToArray()); 
     } 
    } 

Và sau đó tạo ra danh sách các nhiệm vụ và gọi hàm có họ chạy, với nói tối đa là 4 đồng thời cùng một lúc, bạn có thể làm điều này:

var listOfTasks = new List<Task>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(async() => await UploadFile(localFile))); 
} 
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4); 

Ngoài ra, vì đây phương thức hỗ trợ async, nó sẽ không chặn luồng UI như sử dụng Parallel.Invoke hoặc Parallel.ForEach.

0

Tôi đã mã hóa kỹ thuật dưới đây, trong đó tôi sử dụng BlockingCollection làm trình quản lý số lượng chuỗi. Nó khá đơn giản để thực hiện và xử lý công việc. Nó chỉ đơn giản là chấp nhận các đối tượng Task và thêm một giá trị số nguyên vào danh sách chặn, tăng số đếm chuỗi bằng 1. Khi chuỗi kết thúc, nó xóa bỏ đối tượng và giải phóng khối trên thao tác thêm cho các tác vụ sắp tới.

 public class BlockingTaskQueue 
     { 
      private BlockingCollection<int> threadManager { get; set; } = null; 
      public bool IsWorking 
      { 
       get 
       { 
        return threadManager.Count > 0 ? true : false; 
       } 
      } 

      public BlockingTaskQueue(int maxThread) 
      { 
       threadManager = new BlockingCollection<int>(maxThread); 
      } 

      public async Task AddTask(Task task) 
      { 
       Task.Run(() => 
       { 
        Run(task); 
       }); 
      } 

      private bool Run(Task task) 
      { 
       try 
       { 
        threadManager.Add(1); 
        task.Start(); 
        task.Wait(); 
        return true; 

       } 
       catch (Exception ex) 
       { 
        return false; 
       } 
       finally 
       { 
        threadManager.Take(); 
       } 

      } 

     }