2013-07-24 12 views
6

tôi thông qua thực hiện của tôi song song/người tiêu dùng dựa trên các mã trong this questionParallel.ForEach bị đình trệ khi tích hợp với BlockingCollection

class ParallelConsumer<T> : IDisposable 
{ 
    private readonly int _maxParallel; 
    private readonly Action<T> _action; 
    private readonly TaskFactory _factory = new TaskFactory(); 
    private CancellationTokenSource _tokenSource; 
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>(); 
    private Task _task; 

    public ParallelConsumer(int maxParallel, Action<T> action) 
    { 
     _maxParallel = maxParallel; 
     _action = action; 
    } 

    public void Start() 
    { 
     try 
     { 
      _tokenSource = new CancellationTokenSource(); 
      _task = _factory.StartNew(
       () => 
       { 
        Parallel.ForEach(
         _entries.GetConsumingEnumerable(), 
         new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token }, 
         (item, loopState) => 
         { 
          Log("Taking" + item); 
          if (!_tokenSource.IsCancellationRequested) 
          { 
           _action(item); 
           Log("Finished" + item); 
          } 
          else 
          { 
           Log("Not Taking" + item); 
           _entries.CompleteAdding(); 
           loopState.Stop(); 
          } 
         }); 
       }, 
       _tokenSource.Token); 
     } 
     catch (OperationCanceledException oce) 
     { 
      System.Diagnostics.Debug.WriteLine(oce); 
     } 
    } 

    private void Log(string message) 
    { 
     Console.WriteLine(message); 
    } 

    public void Stop() 
    { 
     Dispose(); 
    } 

    public void Enqueue(T entry) 
    { 
     Log("Enqueuing" + entry); 
     _entries.Add(entry); 
    } 

    public void Dispose() 
    { 
     if (_task == null) 
     { 
      return; 
     } 

     _tokenSource.Cancel(); 
     while (!_task.IsCanceled) 
     { 
     } 

     _task.Dispose(); 
     _tokenSource.Dispose(); 
     _task = null; 
    } 
} 

Và đây là một mã kiểm tra

class Program 
{ 
    static void Main(string[] args) 
    { 
     TestRepeatedEnqueue(100, 1); 
    } 

    private static void TestRepeatedEnqueue(int itemCount, int parallelCount) 
    { 
     bool[] flags = new bool[itemCount]; 
     var consumer = new ParallelConsumer<int>(parallelCount, 
               (i) => 
               { 
                flags[i] = true; 
               } 
      ); 
     consumer.Start(); 
     for (int i = 0; i < itemCount; i++) 
     { 
      consumer.Enqueue(i); 
     } 
     Thread.Sleep(1000); 
     Debug.Assert(flags.All(b => b == true)); 



    } 
} 

Bài kiểm tra luôn luôn thất bại - nó luôn luôn bị mắc kẹt ở khoảng 93th-item từ 100 thử nghiệm. Bất kỳ ý tưởng nào một phần của mã của tôi đã gây ra sự cố này và cách khắc phục?

Trả lời

8

Bạn không thể sử dụng Parallel.Foreach() với BlockingCollection.GetConsumingEnumerable(), như bạn đã khám phá.

Đối với một lời giải thích, xem bài viết trên blog này:

http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

Đó blog cũng cung cấp mã nguồn cho một phương pháp gọi là GetConsumingPartitioner() mà bạn có thể sử dụng để giải quyết vấn đề.

Trích từ blog: thực hiện GetConsumingEnumerable

BlockingCollection được sử dụng đồng bộ hóa nội BlockingCollection của vốn đã hỗ trợ nhiều người tiêu dùng đồng thời, nhưng ForEach không biết rằng, và logic đếm phân vùng của nó cũng cần phải tham gia một khóa trong khi truy cập số đếm.

Như vậy, có nhiều đồng bộ hóa ở đây hơn là thực sự cần thiết, dẫn đến một lần truy cập có khả năng không thể bỏ qua.

[Ngoài ra] thuật toán phân vùng được sử dụng theo mặc định bởi cả Parallel.ForEach và PLINQ sử dụng chunking để giảm thiểu chi phí đồng bộ hóa: thay vì lấy khóa một lần cho mỗi phần tử, nó sẽ lấy khóa, lấy một nhóm các phần tử (một đoạn), và sau đó nhả khóa.

Mặc dù thiết kế này có thể giúp thông lượng tổng thể, đối với các trường hợp được tập trung nhiều hơn vào độ trễ thấp, việc phân đoạn đó có thể bị cấm.

+0

Cảm ơn. Điều này giải quyết được vấn đề của tôi. Dù sao, khi tôi kiểm tra thêm, mã trong OP của tôi không thất bại khi số lượng mục là thành viên của chuỗi này, [A200672] (http://oeis.org/A200672) ví dụ: 1, 2, 3, 5, 7, 9, 13, 17, 21, 29, 37, 45, 61, 77, 93, ... Bất kỳ ý tưởng nào tại sao? chỉ tò mò thôi. – user69715

+0

@ user69715 Đó là loại hành vi kỳ lạ mà tôi tìm thấy khi tôi cố gắng làm một điều tương tự. Tôi đoán nó chỉ là một số tương tác kỳ lạ giữa Parallel.ForEach() và BlockingCollection cơ bản, nhưng tôi không thể thực sự giải thích nó. –

2

Lý do cho sự thất bại là vì các lý do sau đây như được giải thích here

Thuật toán phân vùng làm việc mặc định của cả hai Parallel.ForEach và sử dụng PLINQ chunking để giảm thiểu chi phí đồng bộ hóa: thay hơn là lấy khóa một lần cho mỗi phần tử, nó sẽ lấy khóa, lấy một nhóm các phần tử (một đoạn), và sau đó nhả khóa.

Để có được nó để làm việc, bạn có thể thêm một phương thức trên lớp ParallelConsumer<T> của bạn để chỉ ra rằng bổ sung hoàn tất, như sau

public void StopAdding() 
    { 
     _entries.CompleteAdding(); 
    } 

Và bây giờ gọi phương pháp này sau khi for loop của bạn, như sau

 consumer.Start(); 
     for (int i = 0; i < itemCount; i++) 
     { 
      consumer.Enqueue(i); 
     } 
     consumer.StopAdding(); 

Nếu không, Parallel.ForEach() sẽ đợi ngưỡng đạt được để lấy đoạn và bắt đầu xử lý.

+0

điều đang trong quá trình sản xuất, nhiệm vụ được xếp hàng liên tục, vì vậy việc đánh dấu "StopAdding" không giúp ích gì. Cảm ơn câu trả lời của bạn, +1, nhưng tôi sẽ đi với câu trả lời khác. – user69715

+0

Rất tiếc, có vẻ như tôi chưa thể +1 – user69715