2009-12-16 16 views
13

Tôi đang viết một ứng dụng đơn giản (cho vợ tôi không kém :-P) thực hiện một số thao tác hình ảnh (thay đổi kích cỡ, dấu thời gian, vv) cho một loạt ảnh có khả năng lớn. Vì vậy, tôi đang viết một thư viện có thể làm điều này cả đồng bộ và không đồng bộ. Tôi quyết định sử dụng Event-based Asynchronous Pattern. Khi sử dụng mẫu này, bạn cần phải nâng cao một sự kiện khi công việc đã được hoàn thành. Đây là nơi tôi đang gặp vấn đề khi biết nó được thực hiện. Vì vậy, về cơ bản, trong phương pháp DownsizeAsync của tôi (phương pháp async để giảm kích thước hình ảnh) Tôi đang làm một cái gì đó như thế này:Cách tốt nhất để có nhiều chuỗi làm việc là gì và chờ đợi cho tất cả các chuỗi đó hoàn thành?

public void DownsizeAsync(string[] files, string destination) 
    { 
     foreach (var name in files) 
     { 
      string temp = name; //countering the closure issue 
      ThreadPool.QueueUserWorkItem(f => 
      { 
       string newFileName = this.DownsizeImage(temp, destination); 
       this.OnImageResized(newFileName); 
      }); 
     } 
    } 

Phần khó hiểu bây giờ là biết khi nào chúng hoàn tất.

Dưới đây là những gì tôi đã cân nhắc: Sử dụng ManualResetEvents như ở đây: http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx Nhưng vấn đề tôi gặp phải là bạn chỉ có thể chờ 64 sự kiện trở xuống. Tôi có thể có nhiều hơn nhiều hình ảnh.

tùy chọn thứ hai: Có một quầy mà đếm những hình ảnh đó đã được thực hiện, và nâng cao sự kiện khi đếm đạt tổng:

public void DownsizeAsync(string[] files, string destination) 
{ 
    foreach (var name in files) 
    { 
     string temp = name; //countering the closure issue 
     ThreadPool.QueueUserWorkItem(f => 
     { 
      string newFileName = this.DownsizeImage(temp, destination); 
      this.OnImageResized(newFileName); 
      total++; 
      if (total == files.Length) 
      { 
       this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null)); 
      } 
     }); 
    } 


} 

private volatile int total = 0; 

Bây giờ này cảm thấy "hacky" và tôi không hoàn toàn chắc chắn nếu đó là chủ đề an toàn.

Vì vậy, câu hỏi của tôi là, cách tốt nhất để làm điều này là gì? Có cách nào khác để đồng bộ hóa tất cả các chủ đề? Tôi có nên không sử dụng ThreadPool không? Cảm ơn!!

CẬP NHẬT Dựa trên phản hồi trong các ý kiến ​​và từ một vài câu trả lời tôi đã quyết định áp dụng cách này:

Trước tiên, tôi đã tạo ra một phương pháp khuyến nông mà lô một đếm được vào "lô":

public static IEnumerable<IEnumerable<T>> GetBatches<T>(this IEnumerable<T> source, int batchCount) 
    { 
     for (IEnumerable<T> s = source; s.Any(); s = s.Skip(batchCount)) 
     { 
      yield return s.Take(batchCount); 
     } 
    } 

Về cơ bản, nếu bạn làm điều gì đó như thế này:

 foreach (IEnumerable<int> batch in Enumerable.Range(1, 95).GetBatches(10)) 
     { 
      foreach (int i in batch) 
      { 
       Console.Write("{0} ", i); 
      } 
      Console.WriteLine(); 
     } 

bạn nhận được kết quả này:

1 2 3 4 5 6 7 8 9 10 
11 12 13 14 15 16 17 18 19 20 
21 22 23 24 25 26 27 28 29 30 
31 32 33 34 35 36 37 38 39 40 
41 42 43 44 45 46 47 48 49 50 
51 52 53 54 55 56 57 58 59 60 
61 62 63 64 65 66 67 68 69 70 
71 72 73 74 75 76 77 78 79 80 
81 82 83 84 85 86 87 88 89 90 
91 92 93 94 95 

Ý tưởng là (như một người trong nhận xét đã chỉ ra) không cần phải tạo một chuỗi riêng biệt cho mỗi hình ảnh. Vì vậy, tôi sẽ ghép các hình ảnh thành [machine.cores * 2] số lô. Sau đó, tôi sẽ sử dụng phương pháp thứ hai của tôi mà chỉ đơn giản là để giữ một truy cập đi và khi truy cập đạt tổng số tôi mong đợi, tôi sẽ biết tôi đã làm xong.

Lý do Tôi tin bây giờ mà nó là trong thực tế chủ đề an toàn, là bởi vì tôi đã đánh dấu tổng biến như dễ bay hơi mà theo MSDN:

Các modifier dễ bay hơi thường được sử dụng cho một trường được truy cập bởi nhiều chủ đề mà không cần sử dụng câu lệnh khóa để truy cập hàng loạt. Sử dụng modifier ổn định đảm bảo rằng một thread lấy hầu hết các up-to-date giá trị được viết bởi một chủ đề

có nghĩa là tôi phải ở trong rõ ràng (nếu không, xin vui lòng cho tôi biết !!)

Vì vậy, đây là đoạn code tôi đang đi với:

public void DownsizeAsync(string[] files, string destination) 
    { 
     int cores = Environment.ProcessorCount * 2; 
     int batchAmount = files.Length/cores; 

     foreach (var batch in files.GetBatches(batchAmount)) 
     { 
      var temp = batch.ToList(); //counter closure issue 
      ThreadPool.QueueUserWorkItem(b => 
      { 
       foreach (var item in temp) 
       { 
        string newFileName = this.DownsizeImage(item, destination); 
        this.OnImageResized(newFileName); 
        total++; 
        if (total == files.Length) 
        { 
         this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null)); 
        } 
       } 
      }); 
     } 
    } 

Tôi mở cửa cho thông tin phản hồi như tôi không có cách nào một chuyên gia về xử lý đa luồng, vì vậy nếu có ai nhìn thấy bất kỳ vấn đề với điều này, hoặc có một ý tưởng tốt hơn, xin vui lòng cho tôi biết. (Vâng, đây chỉ là một ứng dụng được tạo ra ở nhà, nhưng tôi có một số ý tưởng về cách tôi có thể sử dụng kiến ​​thức mà tôi có được ở đây để cải thiện dịch vụ Tìm kiếm/Chỉ mục của chúng tôi tại nơi làm việc.) cảm thấy như tôi đang sử dụng cách tiếp cận đúng. Cảm ơn tất cả mọi người đã giúp đỡ của bạn.

+3

tổng số không trông an toàn cho tôi! Phương thức truy cập – RichardOD

+2

có khả năng mở rộng cao. Chỉ cần làm cho nó threadsafe sử dụng Interlocked.Increment và .Decrement .. –

+0

Trong thực tế, là 64 một giới hạn cho điều này? Ngay cả với 64 lõi vật lý, bạn sẽ gặp phải tình trạng tắc nghẽn bộ nhớ và đĩa làm suy giảm nhanh hơn nhiều khi truy cập song song. Nhưng sau đó, chúng có thể biến mất sau một hoặc hai năm. – peterchen

Trả lời

11

Điều đơn giản nhất là tạo chủ đề mới và sau đó gọi Thread.Join trên mỗi chủ đề. Bạn có thể sử dụng một semaphore hoặc một cái gì đó như nó - nhưng nó có thể dễ dàng hơn để chỉ tạo chủ đề mới.

Trong .NET 4.0, bạn có thể sử dụng Tiện ích mở rộng song song để thực hiện việc này khá dễ dàng với các tác vụ.

Như một sự thay thế mà sẽ sử dụng threadpool, bạn có thể tạo ra một đại biểu và gọi BeginInvoke vào nó, để trở về một IAsyncResult - sau đó bạn có thể nhận được WaitHandle cho mỗi kết quả thông qua AsyncWaitHandle bất động sản, và gọi WaitHandle.WaitAll.

CHỈNH SỬA: Như được chỉ ra trong nhận xét, bạn chỉ có thể gọi WaitAll với tối đa 64 tay cầm cùng một lúc trên một số triển khai. Các giải pháp thay thế có thể được gọi số WaitOne trên mỗi người trong số họ lần lượt hoặc gọi WaitAll với các lô. Nó sẽ không thực sự quan trọng, miễn là bạn đang làm nó từ một sợi mà không phải là sẽ ngăn chặn các threadpool. Cũng lưu ý rằng bạn không thể gọi số WaitAll từ chuỗi STA.

+2

Jon- bạn đang quên về giới hạn 64 trên WaitAll. – RichardOD

+0

@RichardOD: Cảm ơn, đã thêm ghi chú về điều đó. –

2

.Net 4.0 giúp cho việc đa luồng trở nên dễ dàng hơn (mặc dù bạn vẫn có thể tự chụp mình với các tác dụng phụ).

+1

Thậm chí dễ dàng hơn? Đa luồng không phải là dễ dàng! – RichardOD

+0

Ok ... đa luồng trong C khó hơn. Về mặt lý thuyết Turing-tương đương hoặc những gì có bạn, khái niệm tương tự nếu không giống nhau, nhưng các dòng mã bổ sung làm được trong cách hiểu cốt lõi. –

2

Tôi đã sử dụng SmartThreadPool với nhiều succes để đối phó với vấn đề này. Ngoài ra còn có một trang web Codeplex về de lắp ráp.

SmartThreadPool có thể trợ giúp với các sự cố khác cũng như một số luồng không thể chạy cùng lúc trong khi những người khác có thể chạy.

1

Một tùy chọn khác là sử dụng Ống.

Bạn đăng tất cả công việc cần thực hiện lên đường ống và sau đó đọc dữ liệu từ đường ống từ mỗi sợi. Khi đường ống trống, bạn đã hoàn thành, chủ đề kết thúc và mọi người đều hạnh phúc (tất nhiên hãy đảm bảo bạn đầu tiên sản xuất tất cả công việc, sau đó tiêu thụ nó)

11

Bạn vẫn muốn sử dụng ThreadPool vì nó sẽ quản lý số lượng chủ đề nó chạy đồng thời. Tôi chạy vào một vấn đề tương tự gần đây và giải quyết nó như thế này:

var dispatcher = new ThreadPoolDispatcher(); 
dispatcher = new ChunkingDispatcher(dispatcher, 10); 

foreach (var image in images) 
{ 
    dispatcher.Add(new ResizeJob(image)); 
} 

dispatcher.WaitForJobsToFinish(); 

Các IDispatcher và IJob trông như thế này:

public interface IJob 
{ 
    void Execute(); 
} 

public class ThreadPoolDispatcher : IDispatcher 
{ 
    private IList<ManualResetEvent> resetEvents = new List<ManualResetEvent>(); 

    public void Dispatch(IJob job) 
    { 
     var resetEvent = CreateAndTrackResetEvent(); 
     var worker = new ThreadPoolWorker(job, resetEvent); 
     ThreadPool.QueueUserWorkItem(new WaitCallback(worker.ThreadPoolCallback)); 
    } 

    private ManualResetEvent CreateAndTrackResetEvent() 
    { 
     var resetEvent = new ManualResetEvent(false); 
     resetEvents.Add(resetEvent); 
     return resetEvent; 
    } 

    public void WaitForJobsToFinish() 
    { 
     WaitHandle.WaitAll(resetEvents.ToArray() ?? new ManualResetEvent[] { }); 
     resetEvents.Clear(); 
    } 
} 

Và sau đó sử dụng một trang trí để đoạn việc sử dụng ThreadPool:

public class ChunkingDispatcher : IDispatcher 
{ 
    private IDispatcher dispatcher; 
    private int numberOfJobsDispatched; 
    private int chunkSize; 

    public ChunkingDispatcher(IDispatcher dispatcher, int chunkSize) 
    { 
     this.dispatcher = dispatcher; 
     this.chunkSize = chunkSize; 
    } 

    public void Dispatch(IJob job) 
    { 
     dispatcher.Dispatch(job); 

     if (++numberOfJobsDispatched % chunkSize == 0) 
      WaitForJobsToFinish(); 
    } 

    public void WaitForJobsToFinish() 
    { 
     dispatcher.WaitForJobsToFinish(); 
    } 
} 

Tính năng trừu tượng IDispatcher hoạt động khá tốt để trao đổi kỹ thuật luồng của bạn. Tôi có một thực hiện khác đó là một SingleThreadedDispatcher và bạn có thể làm cho một phiên bản ThreadStart như Jon Skeet đề xuất.Sau đó, nó dễ dàng để chạy mỗi một và xem những loại hiệu suất bạn nhận được. SingleThreadedDispatcher là tốt khi gỡ lỗi mã của bạn hoặc khi bạn không muốn giết bộ xử lý trên hộp của bạn.

Edit: Tôi quên để thêm mã cho ThreadPoolWorker:

public class ThreadPoolWorker 
{ 
    private IJob job; 
    private ManualResetEvent doneEvent; 

    public ThreadPoolWorker(IJob job, ManualResetEvent doneEvent) 
    { 
     this.job = job; 
     this.doneEvent = doneEvent; 
    } 

    public void ThreadPoolCallback(object state) 
    { 
     try 
     { 
      job.Execute(); 
     } 
     finally 
     { 
      doneEvent.Set(); 
     } 
    } 
} 
2

tôi sử dụng một phương pháp hữu ích tĩnh để kiểm tra tất cả chờ đợi cá nhân xử lý ..

public static void WaitAll(WaitHandle[] handles) 
    { 
     if (handles == null) 
      throw new ArgumentNullException("handles", 
       "WaitHandle[] handles was null"); 
     foreach (WaitHandle wh in handles) wh.WaitOne(); 
    } 

Sau đó, trong chính của tôi thread, tôi tạo ra một danh sách các xử lý chờ đợi, và cho mỗi đại biểu tôi đặt trong hàng đợi ThreadPool của tôi, tôi thêm xử lý chờ đợi vào danh sách ...

List<WaitHandle> waitHndls = new List<WaitHandle>(); 
foreach (iterator logic) 
{ 
     ManualResetEvent txEvnt = new ManualResetEvent(false); 

     ThreadPool.QueueUserWorkItem(
      delegate 
       { 
        try { // Code to process each task... } 
        // Finally, set each wait handle when done 
        finally { lock (locker) txEvnt.Set(); } 
       }); 
     waitHndls.Add(txEvnt); // Add wait handle to List 
} 
util.WaitAll(waitHndls.ToArray()); // Check all wait Handles in List 
5

Giải pháp đơn giản và hiệu quả nhất là sử dụng bộ đếm và làm cho chúng an toàn. Điều này sẽ tiêu tốn ít bộ nhớ và có thể mở rộng lên đến con số cao hơn của chủ đề

Đây là một mẫu

int itemCount = 0; 
for (int i = 0; i < 5000; i++) 
{ 
    Interlocked.Increment(ref itemCount); 

    ThreadPool.QueueUserWorkItem(x=>{ 
     try 
     { 
      //code logic here.. sleep is just for demo 
      Thread.Sleep(100); 
     } 
     finally 
     { 
      Interlocked.Decrement(ref itemCount); 
     } 
    }); 
} 

while (itemCount > 0) 
{ 
    Console.WriteLine("Waiting for " + itemCount + " threads..."); 
    Thread.Sleep(100); 
} 
Console.WriteLine("All Done!"); 
+1

+1. Tôi sử dụng mẫu này, mặc dù tôi kiểm tra kết quả của Interlocked.Decrement() để xem nếu chúng ta đã nhấn không và thiết lập một sự kiện nếu như vậy để chỉ ra rằng tất cả các mục được hoàn thành. Bằng cách đó bạn không cần phải thăm dò ý kiến ​​trên itemCount. –

+0

Tôi thích phương pháp này, nhưng tôi tự hỏi nếu điều này là cần thiết nếu biến chúng tôi đang tăng được đánh dấu là dễ bay hơi. – BFree

+1

dễ bay hơi đảm bảo biến không được lưu trong bộ nhớ cache và các trình tiếp cận là nguyên tử, nhưng không đảm bảo rằng hoạt động đầy đủ của việc đọc + thay đổi + ghi sẽ là nguyên tử –

1

Tôi đề nghị đưa những hình ảnh bị ảnh hưởng trong một hàng đợi và khi bạn đọc từ hàng đợi khởi động một thread và chèn thuộc tính System.Threading.Thread.ManagedThreadId của nó vào từ điển cùng với tên tệp. Bằng cách này, giao diện người dùng của bạn có thể liệt kê cả tệp đang chờ xử lý và đang hoạt động.

Khi mỗi luồng hoàn thành, nó sẽ gọi một thường trình gọi lại, trả lại ManagedThreadId của nó. Lệnh gọi lại này (được chuyển làm đại biểu cho chuỗi) sẽ xóa id của chuỗi khỏi từ điển, khởi chạy một chuỗi khác từ hàng đợi và cập nhật giao diện người dùng.

Khi cả hàng đợi và từ điển đều trống, bạn đã hoàn tất.

Hơi phức tạp hơn nhưng theo cách này bạn có giao diện người dùng đáp ứng, bạn có thể dễ dàng kiểm soát số lượng chủ đề đang hoạt động và bạn có thể xem những gì đang diễn ra. Thu thập số liệu thống kê. Nhận được ưa thích với WPF và đưa lên thanh tiến trình cho mỗi tập tin. Cô không thể không bị ấn tượng.

Các vấn đề liên quan