2011-01-12 38 views
14

Về chủ đề chờ cho đến khi các tác vụ hoàn tất và đồng bộ hóa chuỗi.Parallel.ForEach - Hủy bỏ duyên dáng

Tôi hiện đang có một lần lặp tôi đã đính kèm trong Parallel.ForEach. Trong ví dụ dưới đây tôi đã đặt ra một số câu hỏi trong các ý kiến ​​về cách tốt nhất để xử lý việc chấm dứt duyên dáng của vòng lặp (.NET 4.0);

private void myFunction() 
    { 

     IList<string> iListOfItems = new List<string>(); 
     // populate iListOfItems 

     CancellationTokenSource cts = new CancellationTokenSource(); 

     ParallelOptions po = new ParallelOptions(); 
     po.MaxDegreeOfParallelism = 20; // max threads 
     po.CancellationToken = cts.Token; 

     try 
     { 
      var myWcfProxy = new myWcfClientSoapClient(); 

      if (Parallel.ForEach(iListOfItems, po, (item, loopsate) => 
      { 
       try 
       { 
        if (_requestedToStop) 
         loopsate.Stop(); 
        // long running blocking WS call, check before and after 
        var response = myWcfProxy.ProcessIntervalConfiguration(item); 
        if (_requestedToStop) 
         loopsate.Stop(); 

        // perform some local processing of the response object 
       } 
       catch (Exception ex) 
       { 
        // cannot continue game over. 
        if (myWcfProxy.State == CommunicationState.Faulted) 
        { 
         loopsate.Stop(); 
         throw; 
        } 
       } 

       // else carry on.. 
       // raise some events and other actions that could all risk an unhanded error. 

      } 
      ).IsCompleted) 
      { 
       RaiseAllItemsCompleteEvent(); 
      } 
     } 
     catch (Exception ex) 
     { 
      // if an unhandled error is raised within one of the Parallel.ForEach threads, do all threads in the 
      // ForEach abort? or run to completion? Is loopsate.Stop (or equivalent) called as soon as the framework raises an Exception? 
      // Do I need to call cts.Cancel here? 

      // I want to wait for all the threads to terminate before I continue at this point. Howe do we achieve that? 

      // do i need to call cts.Dispose() ? 

      MessageBox.Show(Logging.FormatException(ex)); 
     } 
     finally 
     { 

      if (myWcfProxy != null) 
      { 
      // possible race condition with the for-each threads here unless we wait for them to terminate. 
       if (myWcfProxy.State == System.ServiceModel.CommunicationState.Faulted) 
        myWcfProxy.Abort(); 

       myWcfProxy.Close(); 
      } 

      // possible race condition with the for-each threads here unless we wait for them to terminate. 
      _requestedToStop = false; 

     } 

    } 

Mọi trợ giúp sẽ được đánh giá cao nhất. Các cuộc thảo luận tài liệu MSDN của ManualResetEventSlim và cancelToken.WaitHandle. nhưng không chắc chắn làm thế nào để dây chúng vào này, dường như đang đấu tranh hiểu các ví dụ MSDN như hầu hết không áp dụng.

Trả lời

8

Tôi đã giả lập một số mã bên dưới có thể trả lời câu hỏi của bạn. Điểm cơ bản là bạn nhận được fork/join parallelism với Parallel.ForEach, vì vậy bạn không cần phải lo lắng về điều kiện chủng tộc bên ngoài nhiệm vụ song song (khối thread gọi cho đến khi các tác vụ đã hoàn thành, thành công hay cách khác). Bạn chỉ muốn đảm bảo sử dụng biến LoopState (đối số thứ hai cho lambda) để kiểm soát trạng thái vòng lặp của bạn.

Nếu bất kỳ sự lặp lại nào của vòng lặp đã ném một ngoại lệ chưa được giải quyết, vòng lặp tổng thể sẽ làm tăng AggregateException bị bắt ở cuối.

liên kết khác có đề cập đến chủ đề này:

Parallel.ForEach throws exception when processing extremely large sets of data

http://msdn.microsoft.com/en-us/library/dd460720.aspx

Does Parallel.ForEach limits the number of active threads?

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 
using System.ServiceModel; 

namespace Temp 
{ 
    public class Class1 
    { 
     private class MockWcfProxy 
     { 
      internal object ProcessIntervalConfiguration(string item) 
      { 
       return new Object(); 
      } 

      public CommunicationState State { get; set; } 
     } 

     private void myFunction() 
     { 

      IList<string> iListOfItems = new List<string>(); 
      // populate iListOfItems 

      CancellationTokenSource cts = new CancellationTokenSource(); 

      ParallelOptions po = new ParallelOptions(); 
      po.MaxDegreeOfParallelism = 20; // max threads 
      po.CancellationToken = cts.Token; 

      try 
      { 
       var myWcfProxy = new MockWcfProxy(); 

       if (Parallel.ForEach(iListOfItems, po, (item, loopState) => 
        { 
         try 
         { 
          if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional) 
           loopState.Stop(); 

          // long running blocking WS call, check before and after 
          var response = myWcfProxy.ProcessIntervalConfiguration(item); 

          if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional) 
           loopState.Stop(); 

          // perform some local processing of the response object 
         } 
         catch (Exception ex) 
         { 
          // cannot continue game over. 
          if (myWcfProxy.State == CommunicationState.Faulted) 
          { 
           loopState.Stop(); 
           throw; 
          } 

          // FYI you are swallowing all other exceptions here... 
         } 

         // else carry on.. 
         // raise some events and other actions that could all risk an unhanded error. 
        } 
       ).IsCompleted) 
       { 
        RaiseAllItemsCompleteEvent(); 
       } 
      } 
      catch (AggregateException aggEx) 
      { 
       // This section will be entered if any of the loops threw an unhandled exception. 
       // Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here 
       // to see those (if you want). 
      } 
      // Execution will not get to this point until all of the iterations have completed (or one 
      // has failed, and all that were running when that failure occurred complete). 
     } 

     private void RaiseAllItemsCompleteEvent() 
     { 
      // Everything completed... 
     } 
    } 
} 
+0

Cảm ơn vì sự hiểu biết. Tôi nên nói rằng tại điểm mà bạn đúng chỉ ra "nuốt tất cả các trường hợp ngoại lệ khác ở đây" Tôi đang thực hiện một cuộc gọi đăng nhập sẽ đăng nhập dịch vụ web hoặc ngoại lệ WCF phía khách hàng. Mục đích là để vòng lặp tiếp tục nếu ngoại lệ không dẫn đến một proxy WCF không hợp lệ. Tôi dự đoán lỗi hết hạn hoặc ngoại lệ lỗi phía máy chủ. không cho chức năng cụ thể này, cần bất kỳ chức năng giảm thiểu nào trong quá trình bắt. tuy nhiên, chúng tôi sẽ xem xét tệp nhật ký và bất kỳ ngoại lệ nào như vậy sẽ được điều tra. – Terry

+0

Điều gì khiến tôi nhầm lẫn về Parallel.ForEach là tôi cũng giả định rằng nó phải là một cuộc gọi chặn cho đến khi tất cả các luồng trong hồ bơi hoàn thành (ngoại lệ được lưu trong bộ nhớ cache hoặc không), tuy nhiên số luồng được báo cáo là đang chạy tại một điểm ngắt được đặt trong ví dụ catch của bạn (AggregateException aggEx) khối, sẽ báo cáo là 20 chủ đề trong VS 2010 thread xem. Vì vậy, tôi đã sysinternals ra và nhìn vào thực thi vshost đang được gỡ lỗi và nó cũng cho thấy 22 chủ đề bao gồm cả giao diện người dùng và máy bơm tin nhắn. – Terry

+0

Hơn nữa, các sự kiện được nâng lên trong vòng lặp, đã ném thêm ngoại lệ sau khi hàm đã thực hiện và khối cuối cùng đã chạy. – Terry

Các vấn đề liên quan