2017-11-21 79 views
5

Tôi sẽ đánh giá cao sự giúp đỡ của các chuyên gia PLYNQ ngoài kia! Tôi sẽ mất thời gian xem xét câu trả lời, tôi có một hồ sơ được thành lập hơn về math.SE.Các lý do có thể có tại sao ParallelQuery.Aggregate không chạy song song

Tôi có đối tượng thuộc loại ParallelQuery<List<string>>, trong đó có 44 danh sách mà tôi muốn xử lý song song (năm lần). Quá trình của tôi có chữ ký như

private ProcessResult Process(List<string> input) 

Quá trình xử lý sẽ trả lại kết quả, là một cặp giá trị Boolean, như dưới đây.

private struct ProcessResult 
    { 
     public ProcessResult(bool initialised, bool successful) 
     { 
      ProcessInitialised = initialised; 
      ProcessSuccessful = successful; 
     } 

     public bool ProcessInitialised { get; } 
     public bool ProcessSuccessful { get; } 
    } 

Sự cố. Cho một số IEnumerable<List<string>> processMe, truy vấn PLYNQ của tôi sẽ cố gắng triển khai phương thức này: https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx. Nó được viết là

processMe.AsParallel() 
     .Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult> 
      (
       new ConcurrentStack<ProcessResult>, //aggregator seed 
       (agg, input) => 
       {       //updating the aggregate result 
        var res = Process(input); 
        agg.Push(res); 
        return agg; 
       }, 
       agg => 
       {       //obtain the result from the aggregator agg 
        ProcessResult res; // (in this case just the most recent result**) 
        agg.TryPop(out res); 
        return res; 
       } 
      ); 

Thật không may nó không chạy song song, chỉ tuần tự. (** lưu ý rằng việc thực hiện điều này không làm cho "cảm giác", tôi chỉ cố gắng để có được những parallelisation làm việc cho bây giờ.)


Tôi đã thử một thực hiện hơi khác nhau, trong đó đã chạy song song , nhưng không có tập hợp. Tôi đã xác định phương pháp tổng hợp (về cơ bản là Boolean AND trên cả hai phần của ProcessResult, tức là tổng hợp ([A1, A2], [B1, B2]) ≡ [A1 & & B1, A2 & & B2]).

private static ProcessResult AggregateProcessResults 
     (ProcessResult aggregate, ProcessResult latest) 
    { 
     bool ini = false, suc = false; 
     if (aggregate.ProcessInitialised && latest.ProcessInitialised) 
      ini = true; 
     if (aggregate.ProcessSuccessful && latest.ProcessSuccessful) 
      suc = true; 


     return new ProcessResult(ini, suc); 
    } 

Và sử dụng các truy vấn PLYNQ https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx

.Aggregate<List<string>, ProcessResult, ProcessResult>(
    new ProcessResult(true, true), 
    (res, input) => Process(input), 
    (agg, latest) => AggregateProcessResults(agg, latest), 
    agg   => agg 

Vấn đề ở đây là mã AggregateProcessResults không bao giờ đánh, đối với một số lý do, tôi tránh khỏi thất bại mà kết quả sẽ ...

Cảm ơn bạn đã đọc, bất kỳ trợ giúp nào được đánh giá cao :)

+0

Nếu bạn muốn tính toán một giá trị mới cho mỗi mục trong trình tự, bạn nên sử dụng 'Select', không phải' Aggregate'. Khi bạn sử dụng các hoạt động chính xác cho công việc mà bạn đang cố gắng làm bạn sẽ tìm thấy hệ thống sẽ có thể thực hiện nó hiệu quả hơn nhiều. – Servy

+0

Bạn có bao nhiêu mục trong bộ sưu tập của mình? (Chỉ có 44?) Bạn có bao nhiêu lõi CPU? Bởi vì chạy một truy vấn trên nhiều Treads và nhân lõi CPU đòi hỏi phải chuẩn bị phức tạp. Bộ sưu tập phải được chia thành nhiều phần như nhiều lõi CPU có sẵn, chạy các nhiệm vụ trên các luồng và cuối cùng là tổng hợp các kết quả. .NET đủ thông minh để không làm nhiều việc để làm mọi thứ chậm hơn ... – Major

+0

@Major Tôi có 22000 chuỗi, được chia thành 500, cung cấp 44 danh sách s. Tôi bị giới hạn chạy năm tiến trình đồng thời – Szmagpie

Trả lời

2

Quá tải Aggregate bạn sử dụng thực sự sẽ không chạy trong p song song, theo thiết kế. Bạn vượt qua hạt giống, sau đó chức năng bước, nhưng đối số với chức năng bước (agg) là bộ tích lũy được nhận từ trước đó bước. Vì lý do đó, nó vốn có tuần tự (kết quả của bước trước đó là đầu vào cho bước tiếp theo) và không song song. Không chắc chắn lý do tại sao tình trạng quá tải này được đưa vào ParallelEnumerable, nhưng có thể có lý do.

Thay vào đó, sử dụng một quá tải:

var result = processMe 
.AsParallel() 
.Aggregate 
(
    // seed factory. Each partition will call this to get its own seed 
    () => new ConcurrentStack<ProcessResult>(), 
    // process element and update accumulator 
    (agg, input) => 
    {           
     var res = Process(input); 
     agg.Push(res); 
     return agg; 
    }, 
    // combine accumulators from different partitions 
    (agg1, agg2) => { 
     agg1.PushRange(agg2.ToArray()); 
     return agg1; 
    }, 
    // reduce 
    agg => 
    { 
     ProcessResult res; 
     agg.TryPop(out res); 
     return res; 
    } 
); 
Các vấn đề liên quan