2013-02-11 28 views
9

Tôi muốn sử dụng .NET iterator với các nhiệm vụ song song/đang chờ ?. Một cái gì đó như thế này:Cách lấy từ các tác vụ song song trong .NET 4.5

IEnumerable<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source) 
{ 
    Parallel.ForEach(
     source, 
     s=> 
     { 
      // Ordering is NOT important 
      // items can be yielded as soon as they are done     
      yield return ExecuteOrDownloadSomething(s); 
     } 
} 

Thật không may .NET không thể xử lý điều này. Câu trả lời hay nhất cho đến nay bởi @svick - sử dụng AsParallel().

THƯỞNG: Bất kỳ mã async/await đơn giản nào triển khai nhiều nhà xuất bản và một người đăng ký duy nhất? Người đăng ký sẽ kiếm được và các quán rượu sẽ xử lý. (chỉ dành cho các thư viện chính)

Trả lời

11

Điều này có vẻ giống như một công việc cho PLINQ:

return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s)); 

Điều này sẽ thực hiện ủy quyền song song với một số chuỗi giới hạn, trả về mỗi kết quả ngay sau khi hoàn thành.

Nếu phương pháp ExecuteOrDownloadSomething() được IO-bound (ví dụ nó thực sự tải một cái gì đó) và bạn không muốn lãng phí chủ đề, sau đó sử dụng async-await thể có ý nghĩa, nhưng nó sẽ phức tạp hơn.

Nếu bạn muốn tận dụng tối đa async, bạn không nên trả lại IEnumerable, vì nó đồng bộ (nghĩa là nó chặn nếu không có mặt hàng nào). Những gì bạn cần là một số loại bộ sưu tập đồng bộ, và bạn có thể sử dụng ISourceBlock (đặc biệt là TransformBlock) từ TPL Dataflow cho rằng:

ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source) 
{ 
    var block = new TransformBlock<TSrc, TDest>(
     async s => await ExecuteOrDownloadSomethingAsync(s), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
     }); 

    foreach (var item in source) 
     block.Post(item); 

    block.Complete(); 

    return block; 
} 

Nếu nguồn là “chậm” (tức là bạn muốn bắt đầu xử lý các kết quả từ Foo() trước khi lặp lại source hoàn tất), bạn có thể muốn di chuyển các cuộc gọi foreachComplete() đến một số riêng biệt Task. Thậm chí giải pháp tốt hơn cũng sẽ là làm cho source cũng thành một ISourceBlock<TSrc>.

+0

Cảm ơn, nhưng bạn có thể cho một ví dụ về cách này có thể được giải quyết với async/await? Cám ơn! – Yurik

+0

@Yurik Bạn có thể giải thích tại sao bạn muốn điều đó không? – svick

+0

Chủ yếu là vì tôi cảm thấy nó sẽ giúp tôi hiểu cú pháp chờ đợi mới cho một vấn đề không phải là "không đồng bộ 101", mà là một kịch bản thế giới thực. – Yurik

0

Trong thư viện không đồng bộ của nhóm rô bốt MS, chúng có các nguyên thủy đồng thời cho phép sử dụng trình lặp để tạo mã không đồng bộ.

Thư viện (CCR) là miễn phí (Không sử dụng miễn phí). Một bài báo giới thiệu đẹp có thể được tìm thấy ở đây: Concurrent affairs

Có lẽ bạn có thể sử dụng Thư viện này cùng với thư viện nhiệm vụ Net, hoặc nó sẽ khuyến khích bạn 'roll của riêng bạn'

+0

Ông có thể giải thích như thế nào chính xác bạn sẽ sử dụng CCR đây? – svick

+0

Bài viết tôi trích dẫn có thể giải thích nó tốt hơn tôi có thể.Nếu bạn nhìn vào nó và kiểm tra hình: 'Hình 6 SerialAsyncDemo' nó có một ví dụ mã gần như chính xác như những gì OP hỏi: Các hoạt động không đồng bộ bằng cách sử dụng trình lặp .Net để sinh lời. Tôi thừa nhận rằng tôi sẽ nghĩ rằng cú pháp này lặp, mặc dù thông minh cho thời gian của nó, bây giờ chủ yếu là thay thế bởi cú pháp async/await – Toad

1

Vì vậy, nó xuất hiện những gì bạn thực sự muốn làm là sắp xếp thứ tự các tác vụ dựa trên thời điểm hoàn thành. Đây không phải là terribly phức tạp:

public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks) 
{ 
    var input = tasks.ToList(); 

    var output = input.Select(task => new TaskCompletionSource<T>()); 
    var collection = new BlockingCollection<TaskCompletionSource<T>>(); 
    foreach (var tcs in output) 
     collection.Add(tcs); 

    foreach (var task in input) 
    { 
     task.ContinueWith(t => 
     { 
      var tcs = collection.Take(); 
      switch (task.Status) 
      { 
       case TaskStatus.Canceled: 
        tcs.TrySetCanceled(); 
        break; 
       case TaskStatus.Faulted: 
        tcs.TrySetException(task.Exception.InnerExceptions); 
        break; 
       case TaskStatus.RanToCompletion: 
        tcs.TrySetResult(task.Result); 
        break; 
      } 
     } 
     , CancellationToken.None 
     , TaskContinuationOptions.ExecuteSynchronously 
     , TaskScheduler.Default); 
    } 

    return output.Select(tcs => tcs.Task); 
} 

Vì vậy, ở đây chúng tôi tạo ra một TaskCompletionSource cho từng nhiệm vụ đầu vào, sau đó đi qua từng nhiệm vụ và thiết lập một sự tiếp nối mà lấy nguồn hoàn tiếp theo từ một BlockingCollection và đặt nó kết quả. Nhiệm vụ đầu tiên hoàn thành lấy các tcs đầu tiên được trả về, nhiệm vụ thứ hai hoàn thành nhận được các tcs thứ hai được trả lại, v.v.

Bây giờ mã của bạn trở nên khá đơn giản:

var tasks = collection.Select(item => LongRunningOperationThatReturnsTask(item)) 
    .Order(); 
foreach(var task in tasks) 
{ 
    var result = task.Result;//or you could `await` each result 
    //.... 
} 
+0

Cảm ơn, nhưng những gì tôi cần là để có được một dòng các đối tượng được xử lý như sản lượng từ một phương pháp. Những gì bạn cung cấp về cơ bản là một viết lại của Parallel.ForEach(). – Yurik

+0

@Yurik Nếu bạn không cần phải đợi tất cả các mục được thực hiện, bạn có thể xóa 'WhenAll' /' WaitAll', nhưng khác hơn là tôi không thấy cách 'Select' không làm những gì bạn cần trong và của chính nó. Bạn có một chuỗi các mục, và bạn muốn biến nó thành một chuỗi các nhiệm vụ, một cho mỗi mục. Làm thế nào để 'Chọn (item => LongRunningOperation (item))' không đáp ứng nhu cầu của bạn khi nó trả về một chuỗi các nhiệm vụ? – Servy

+0

Trong trường hợp đó, thứ tự của các mục sẽ giống như bản gốc, có thể không hiệu quả. Tôi không bận tâm về thứ tự sản phẩm. – Yurik

Các vấn đề liên quan