2009-02-12 31 views
38

Tôi xin lỗi vì một câu hỏi dư thừa. Tuy nhiên, tôi đã tìm thấy nhiều giải pháp cho vấn đề của mình nhưng không có giải pháp nào được giải thích rõ ràng. Tôi hy vọng rằng nó sẽ được làm rõ, ở đây.Đợi các chủ đề được gộp lại để hoàn thành

Chủ đề chính của ứng dụng C# của tôi sinh ra 1..n nhân viên nền sử dụng ThreadPool. Tôi muốn cho các chủ đề ban đầu để khóa cho đến khi tất cả các công nhân đã hoàn thành. Tôi đã nghiên cứu ManualResetEvent nói riêng nhưng tôi không rõ ràng về việc sử dụng nó.

Trong giả:

foreach(var o in collection) 
{ 
    queue new worker(o); 
} 

while(workers not completed) { continue; } 

Nếu cần thiết, tôi sẽ biết số lượng công nhân mà là về để được xếp hàng trước mặt.

+0

Hi hãy nhìn vào bài tương tự ở đây http://stackoverflow.com/questions/358721/be-notified-when-all-background-threadpool-threads-are-finished – Valentin

Trả lời

54

Hãy thử điều này. Hàm này có trong danh sách các đại biểu Action. Nó sẽ thêm một mục nhập công nhân ThreadPool cho mỗi mục trong danh sách. Nó sẽ chờ cho mọi hành động hoàn thành trước khi trở về.

public static void SpawnAndWait(IEnumerable<Action> actions) 
{ 
    var list = actions.ToList(); 
    var handles = new ManualResetEvent[actions.Count()]; 
    for (var i = 0; i < list.Count; i++) 
    { 
     handles[i] = new ManualResetEvent(false); 
     var currentAction = list[i]; 
     var currentHandle = handles[i]; 
     Action wrappedAction =() => { try { currentAction(); } finally { currentHandle.Set(); } }; 
     ThreadPool.QueueUserWorkItem(x => wrappedAction()); 
    } 

    WaitHandle.WaitAll(handles); 
} 
+6

WaitHandle.WaitAll không thành công nếu số lượng xử lý lớn hơn hệ thống cho phép. Trên máy chủ Win2k3 của tôi số đó là 64 vì vậy tôi nhận được một ngoại lệ khi tôi cố gắng đẻ trứng hơn 64 mục ... –

+1

@Eran, hãy thử viết một SpawAndWaitHelper mà về cơ bản có mã ở trên. Sử dụng SpawAndWait để chia số đếm thành 64 kích thước khối và gọi người trợ giúp cho từng đoạn. – JaredPar

+0

ah ... http://stackoverflow.com/questions/1045980/is-there-a-better-way-to-wait-for-queued-threads/1074770#1074770 –

13

Đầu tiên, công nhân thực hiện trong bao lâu? chủ đề hồ bơi nói chung nên được sử dụng cho các nhiệm vụ ngắn ngủi - nếu chúng sẽ chạy trong một thời gian, hãy xem xét các chuỗi thủ công.

Khắc phục sự cố; bạn có thực sự cần chặn luồng chính không? Bạn có thể sử dụng gọi lại thay thế không? Nếu có, chẳng hạn như:

int running = 1; // start at 1 to prevent multiple callbacks if 
      // tasks finish faster than they are started 
Action endOfThread = delegate { 
    if(Interlocked.Decrement(ref running) == 0) { 
     // ****run callback method**** 
    } 
}; 
foreach(var o in collection) 
{ 
    var tmp = o; // avoid "capture" issue 
    Interlocked.Increment(ref running); 
    ThreadPool.QueueUserWorkItem(delegate { 
     DoSomeWork(tmp); // [A] should handle exceptions internally 
     endOfThread(); 
    }); 
} 
endOfThread(); // opposite of "start at 1" 

Đây là cách khá nhẹ (không có hệ điều hành) để theo dõi công nhân.

Nếu bạn cần chặn, bạn có thể làm tương tự bằng cách sử dụng Monitor (một lần nữa, tránh một đối tượng OS):

object syncLock = new object(); 
    int running = 1; 
    Action endOfThread = delegate { 
     if (Interlocked.Decrement(ref running) == 0) { 
      lock (syncLock) { 
       Monitor.Pulse(syncLock); 
      } 
     } 
    }; 
    lock (syncLock) { 
     foreach (var o in collection) { 
      var tmp = o; // avoid "capture" issue 
      ThreadPool.QueueUserWorkItem(delegate 
      { 
       DoSomeWork(tmp); // [A] should handle exceptions internally 
       endOfThread(); 
      }); 
     } 
     endOfThread(); 
     Monitor.Wait(syncLock); 
    } 
    Console.WriteLine("all done"); 
+2

Mã của bạn sẽ chờ đợi vô hạn nếu một trong các đại biểu ném một ngoại lệ. – JaredPar

+2

Nếu một trong những đại biểu ném một ngoại lệ, tôi sẽ mất toàn bộ quá trình, vì vậy đó là khá tùy ý ... Tôi giả sử nó sẽ không ném, nhưng tôi sẽ làm cho nó rõ ràng ;-p –

+0

công nhân sẽ xử lý các hoạt động tốn kém bao gồm đọc và ghi các tệp và thực hiện các lựa chọn và chèn SQL liên quan đến các cột nhị phân/hình ảnh. Không chắc họ sẽ sống đủ lâu để yêu cầu các chủ đề rõ ràng, nhưng hiệu suất có thể đạt được bằng cách cho phép chúng thực thi song song. – Kivin

1

Tôi nghĩ bạn đã đi đúng hướng với ManualResetEvent. link có mẫu mã phù hợp chặt chẽ với những gì bạn đang cố gắng thực hiện. Điều quan trọng là sử dụng WaitHandle.WaitAll và vượt qua một loạt các sự kiện chờ đợi. Mỗi thread cần thiết lập một trong những sự kiện chờ đợi này.

// Simultaneously calculate the terms. 
    ThreadPool.QueueUserWorkItem(
     new WaitCallback(CalculateBase)); 
    ThreadPool.QueueUserWorkItem(
     new WaitCallback(CalculateFirstTerm)); 
    ThreadPool.QueueUserWorkItem(
     new WaitCallback(CalculateSecondTerm)); 
    ThreadPool.QueueUserWorkItem(
     new WaitCallback(CalculateThirdTerm)); 

    // Wait for all of the terms to be calculated. 
    WaitHandle.WaitAll(autoEvents); 

    // Reset the wait handle for the next calculation. 
    manualEvent.Reset(); 

Edit:

Hãy chắc chắn rằng trong công nhân đường dẫn mã đề của bạn, bạn thiết lập các sự kiện (ví dụ: autoEvents 1 .set();). Một khi tất cả chúng đều báo hiệu waitAll sẽ quay trở lại.

void CalculateSecondTerm(object stateInfo) 
{ 
    double preCalc = randomGenerator.NextDouble(); 
    manualEvent.WaitOne(); 
    secondTerm = preCalc * baseNumber * 
     randomGenerator.NextDouble(); 
    autoEvents[1].Set(); 
} 
29

Đây là cách tiếp cận khác - đóng gói; do đó, mã của bạn có thể đơn giản như:

Forker p = new Forker(); 
    foreach (var obj in collection) 
    { 
     var tmp = obj; 
     p.Fork(delegate { DoSomeWork(tmp); }); 
    } 
    p.Join(); 

Trường hợp lớp Forker được đưa ra dưới đây (Tôi đã chán trên ;-p tàu) ... một lần nữa, điều này tránh được đối tượng hệ điều hành, nhưng kết thúc tốt đẹp mọi thứ lên khá gọn gàng (IMO):

using System; 
using System.Threading; 

/// <summary>Event arguments representing the completion of a parallel action.</summary> 
public class ParallelEventArgs : EventArgs 
{ 
    private readonly object state; 
    private readonly Exception exception; 
    internal ParallelEventArgs(object state, Exception exception) 
    { 
     this.state = state; 
     this.exception = exception; 
    } 

    /// <summary>The opaque state object that identifies the action (null otherwise).</summary> 
    public object State { get { return state; } } 

    /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary> 
    public Exception Exception { get { return exception; } } 
} 

/// <summary>Provides a caller-friendly wrapper around parallel actions.</summary> 
public sealed class Forker 
{ 
    int running; 
    private readonly object joinLock = new object(), eventLock = new object(); 

    /// <summary>Raised when all operations have completed.</summary> 
    public event EventHandler AllComplete 
    { 
     add { lock (eventLock) { allComplete += value; } } 
     remove { lock (eventLock) { allComplete -= value; } } 
    } 
    private EventHandler allComplete; 
    /// <summary>Raised when each operation completes.</summary> 
    public event EventHandler<ParallelEventArgs> ItemComplete 
    { 
     add { lock (eventLock) { itemComplete += value; } } 
     remove { lock (eventLock) { itemComplete -= value; } } 
    } 
    private EventHandler<ParallelEventArgs> itemComplete; 

    private void OnItemComplete(object state, Exception exception) 
    { 
     EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock 
     if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception)); 
     if (Interlocked.Decrement(ref running) == 0) 
     { 
      EventHandler allHandler = allComplete; // don't need to lock 
      if (allHandler != null) allHandler(this, EventArgs.Empty); 
      lock (joinLock) 
      { 
       Monitor.PulseAll(joinLock); 
      } 
     } 
    } 

    /// <summary>Adds a callback to invoke when each operation completes.</summary> 
    /// <returns>Current instance (for fluent API).</returns> 
    public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler) 
    { 
     if (handler == null) throw new ArgumentNullException("handler"); 
     ItemComplete += handler; 
     return this; 
    } 

    /// <summary>Adds a callback to invoke when all operations are complete.</summary> 
    /// <returns>Current instance (for fluent API).</returns> 
    public Forker OnAllComplete(EventHandler handler) 
    { 
     if (handler == null) throw new ArgumentNullException("handler"); 
     AllComplete += handler; 
     return this; 
    } 

    /// <summary>Waits for all operations to complete.</summary> 
    public void Join() 
    { 
     Join(-1); 
    } 

    /// <summary>Waits (with timeout) for all operations to complete.</summary> 
    /// <returns>Whether all operations had completed before the timeout.</returns> 
    public bool Join(int millisecondsTimeout) 
    { 
     lock (joinLock) 
     { 
      if (CountRunning() == 0) return true; 
      Thread.SpinWait(1); // try our luck... 
      return (CountRunning() == 0) || 
       Monitor.Wait(joinLock, millisecondsTimeout); 
     } 
    } 

    /// <summary>Indicates the number of incomplete operations.</summary> 
    /// <returns>The number of incomplete operations.</returns> 
    public int CountRunning() 
    { 
     return Interlocked.CompareExchange(ref running, 0, 0); 
    } 

    /// <summary>Enqueues an operation.</summary> 
    /// <param name="action">The operation to perform.</param> 
    /// <returns>The current instance (for fluent API).</returns> 
    public Forker Fork(ThreadStart action) { return Fork(action, null); } 

    /// <summary>Enqueues an operation.</summary> 
    /// <param name="action">The operation to perform.</param> 
    /// <param name="state">An opaque object, allowing the caller to identify operations.</param> 
    /// <returns>The current instance (for fluent API).</returns> 
    public Forker Fork(ThreadStart action, object state) 
    { 
     if (action == null) throw new ArgumentNullException("action"); 
     Interlocked.Increment(ref running); 
     ThreadPool.QueueUserWorkItem(delegate 
     { 
      Exception exception = null; 
      try { action(); } 
      catch (Exception ex) { exception = ex;} 
      OnItemComplete(state, exception); 
     }); 
     return this; 
    } 
} 
+0

(HI MARC! Hãy nhớ bài đăng này?) Trong tò mò, tại sao var tmp = obj cần thiết? Tôi thực hiện nó bằng cách chỉ truyền đối tượng của tôi vào và tôi đã có kết quả điên rồ. Thay đổi nó sang sử dụng var đã kết thúc. Tôi rõ ràng không hiểu điều gì đó! Cảm ơn, và xem bạn có thể nhớ sau hai năm không :) – DanTheMan

+1

@user Câu trả lời cho điều đó hơi phức tạp, nhưng trong ngắn hạn, đó là vì C# không lặng lẽ làm chính xác những gì bạn muốn nói mà không hề nhận ra. Nói chung là khá tốt khi làm điều này một cách rõ ràng ở mọi nơi, nhưng không phải trong trường hợp này. –

+4

Bạn cần hiểu rằng mã 'delegate {DoSomeWork (tmp); } '* nắm bắt * biến' tmp'. Mỗi cuộc gọi đến mã này sẽ ghi lại một biến * khác nhau * mỗi lần vòng lặp, vì phạm vi 'tmp' bị giới hạn trong phần thân của vòng lặp. Tuy nhiên, biến 'foreach' là biến * same * mỗi lần quanh vòng lặp, vì vậy tất cả các cuộc gọi đến' delegate {DoSomeWork (tmp); } 'nắm bắt cùng một điều. Điều này thực sự không cần phải như thế này; hạn chế phạm vi của biến foreach sẽ tạo ra rất nhiều mã "chỉ làm việc" mà không có ai thậm chí còn nhận ra sự phức tạp của tình huống. –

1

Sử dụng .NET 4.0 Barrie r lớp:

 Barrier sync = new Barrier(1); 

     foreach(var o in collection) 
     { 
      WaitCallback worker = (state) => 
      { 
       // do work 
       sync.SignalAndWait(); 
      }; 

      sync.AddParticipant(); 
      ThreadPool.QueueUserWorkItem(worker, o); 
     } 

     sync.SignalAndWait(); 
+1

Có giới hạn trên về số lượng người tham gia có thể được sử dụng. :( –

8

Tôi đã được sử dụng thư viện nhiệm vụ song song mới trong CTP here:

 Parallel.ForEach(collection, o => 
      { 
       DoSomeWork(o); 
      }); 
+0

Đề xuất tốt! Ngoài ra dễ dàng hơn khi nói đến xử lý ngoại lệ. Xem: http://msdn.microsoft.com/en-us/library/dd991486.aspx – Joop

+0

Hãy đặc biệt thận trọng vì điều này sử dụng ThreadPool và nó không phải là có thể Thậm chí sử dụng TaskFactory bên dưới với tùy chọn LongRunning chỉ cung cấp một gợi ý tại trình lên lịch, nhưng không phải là một sự đảm bảo cho một luồng chuyên dụng. – eduncan911

3

Đây là một giải pháp sử dụng lớp CountdownEvent.

var complete = new CountdownEvent(1); 
foreach (var o in collection) 
{ 
    var capture = o; 
    ThreadPool.QueueUserWorkItem((state) => 
    { 
     try 
     { 
     DoSomething(capture); 
     } 
     finally 
     { 
     complete.Signal(); 
     } 
    }, null); 
} 
complete.Signal(); 
complete.Wait(); 

Tất nhiên, nếu bạn có quyền truy cập vào lớp CountdownEvent thì bạn có toàn bộ TPL hợp tác. Lớp học Parallel sẽ đảm bảo việc chờ bạn.

Parallel.ForEach(collection, o => 
    { 
    DoSomething(o); 
    }); 
Các vấn đề liên quan