2012-01-25 27 views
6

Tôi có một vài phương pháp máy phát vô hạn, bao gồm một số máy phát điện chạy dài và vô hạn.Những cạm bẫy cố gắng sử dụng PLINQ trên các máy phát điện chạy dài?

IEnumerable<T> ExampleOne() { 
    while(true) // this one blocks for a few seconds at a time 
     yield return LongRunningFunction(); 
} 
IEnumerable<T> ExampleTwo() { 
    while(true) //this one blocks for a really long time 
     yield return OtherLongRunningFunction(); 
} 

Mục tiêu của tôi là có chuỗi vô hạn kết hợp các mục từ hai ví dụ. Đây là những gì tôi đã cố gắng, sử dụng PLINQ:

IEnumerable<T> combined = new[] { ExampleOne(), ExampleTwo() } 
      .AsParallel() 
      .WithMergeOptions(ParallelMergeOptions.NotBuffered) 
      .WithExecutionMode(ParallelExecutionMode.ForceParallelism) 
      .SelectMany(source => source.GetRequests()); 

này dường như thích hợp kết hợp hai IEnumerables thành một hình mới, với các mặt hàng từ IEnumerable # 1 và # 2 là có sẵn bất cứ khi nào chúng xuất hiện trong một trong hai nguồn IEnumerables:

//assuming ExampleTwo yields TWO but happens roughly 5 times 
//less often then ExampleOne 
Example output: one one one one one TWO one one one one one one TWO 

Tuy nhiên, nó có vẻ như đôi khi (thường là sau nhiều giờ của chạy) OtherLongRunningFunction() sẽ đi cho một thời gian dài mà không trả lại, và trong điều kiện mà khó tái sản xuất, chuỗi combined sẽ b khóa trên nó thay vì tiếp tục trả về kết quả từ LongRunningFunction đầu tiên. Dường như mặc dù truy vấn song song kết hợp đã bắt đầu bằng cách sử dụng hai luồng, nó đã quyết định chuyển sang một luồng sau này.

Suy nghĩ đầu tiên của tôi là "đây có thể là công việc cho RX Observable.Merge chứ không phải cho PLINQ". Nhưng tôi đánh giá cao cả hai câu trả lời cho thấy các cách thay thế chính xác để xử lý tình huống này cũng như giải thích về cơ chế PLINQ có thể thay đổi mức độ của các giờ song song sau khi bắt đầu một truy vấn.

+0

Tôi không thể nói rằng tôi có nhiều kinh nghiệm với PLINQ, vì vậy điều này chủ yếu sẽ không đủ tiêu chuẩn đoán: Có vẻ như bạn giả sử kết hợp 'ExampleOne()' và 'ExampleTwo()' và chạy 'AsParallel()' trên kết quả 'IEnumerable' sẽ thay thế hoàn toàn bằng cách trả lại một kết quả từ kết quả thứ nhất và một từ kết quả thứ hai. Nó có thể là giả định này là sai? Nếu vậy, bạn có thể kết thúc trong một tình huống mà chuỗi đang được xử lý trông giống như _1st 2nd 1st 2nd 1st 1st 1st 2nd 1st 2nd 1st 1st 2nd 1st 2nd 2nd 2nd ..._ Và điều đó có thể giải thích tại sao nó _appears_ như thể bạn đang bị mắc kẹt trên số hai. – Nailuj

+0

@Nailuj Nó thực sự giống như tôi mong đợi * 1 1 1 1 1 1 1 2 1 1 1 1 1 1 1 2 * vv, 2s là khá hiếm, và có thể có thời gian dài của thời gian giữa 2s liên tiếp. Tôi hy vọng chuỗi kết hợp sẽ tiếp tục quay trở lại 1s, và thực sự, đây là những gì thực sự xảy ra đối với hầu hết các phần. Nhưng đôi khi nó cũng ngừng quay trở lại 1s. – Jimmy

+0

Một suy nghĩ khác: có thể PLINQ bắt đầu bằng cách xử lý số lượng bằng 1 và 2 song song, nhưng vì kết thúc 1s nhanh hơn nhiều, nó sẽ xuất hiện dưới dạng chuỗi bạn mô tả. Tuy nhiên, bất cứ khi nào 1 kết thúc, "điểm" đó sẽ xen kẽ được lấp đầy bởi 1 và 2 tương ứng. Kết quả sẽ là ngay từ đầu, nó sẽ xuất hiện như thể bạn nhận được nhiều 1s và chỉ là 2 hiếm, nhưng về lâu dài "hàng đợi" các nhiệm vụ đang được xử lý sẽ được lấp đầy với 2 giây, và do đó dường như bị mắc kẹt ? Nếu không chính xác như thế này, nó có liên quan gì không? Bạn đã thực sự xác minh :-) – Nailuj

Trả lời

2

Đây là cách Rx để làm điều đó, và quả thật, nó sử dụng Merge:

IObservable<T> LongRunningFunction() 
{ 
    return Observable.Start(() => { 
     // Calculate some stuff 
     return blah; 
    }, Scheduler.TaskPoolScheduler); 
} 

Observable.Merge(
    Observable.Defer(LongRunningFunction).Repeat(), 
    Observable.Defer(OtherLongRunningFunction).Repeat(), 
).Subscribe(x => { 
    Console.WriteLine("An item: {0}", x); 
}); 
1

Nếu bạn muốn những lợi ích của TPL đặc biệt cho các nhiệm vụ với tải khác nhau (những gì sẽ xảy ra khi bạn đăng ký khối, và một số lượng mặt hàng đã được sản xuất - bạn có nên ngừng sản phẩm?), tôi khuyên bạn nên TPL DataFlow.

Nếu bạn muốn làm điều đó với Rx, cho rất dài chạy nhiệm vụ tính toán, tốt nhất là không để chặn các hồ bơi thread:

var stream = Observable.Merge(ExampleTwo().ToObservable(Scheduler.NewThread), ExampleOne().ToObservable(Scheduler.NewThread)); 

stream.Subscribe(...); 
+0

Nếu bạn đang chặn trong khi sử dụng Rx, bạn đang thực hiện Nó sai (tm). Độc quyền hóa nhóm công tác không tệ như người ta nghĩ vì nó là một việc thực hiện ăn cắp nhiệm vụ. Điều đó đang được nói, nếu bạn * biết * bạn sẽ đậu một loạt các luồng trên luồng công việc chuyên sâu của CPU, bạn có thể làm điều NewThread. –

+0

@PaulBetts Vâng nói. Bạn đã viết trong phiên bản TaskPool, vì vậy tôi đưa vào phiên bản thread. Hiểu biết về các tùy chọn luôn tốt. – Asti

1

Liên quan đến cơ chế PLINQ:

tôi gặp phải cùng vấn đề: Tôi có một chuỗi có mục đòi hỏi thời gian xử lý không đồng đều, một số trong đó dài hơn theo thứ tự độ lớn. Tôi kinh nghiệm nạn đói, tái tạo nhiều hơn trên bộ vi xử lý 8 nhân so với 4 lõi, mặc dù nó có thể xảy ra trên 4 lõi cũng như sau nhiều giờ xử lý. Một số chủ đề có thể nhận công việc trở lại sau một thời gian. Lưu ý rằng chunking động được sử dụng, như trong ví dụ.

Quan sát: Có thể xảy ra tình trạng đói hơn khi hoàn thành các mục công việc rất dài liên tiếp.

MSDN chủ đề Parallel Loops tỏ một số ánh sáng:

Hãy cẩn thận nếu bạn sử dụng vòng song song với bước cá nhân mà mất vài giây hoặc lâu hơn. Điều này có thể xảy ra với khối lượng công việc I/O-bound cũng như tính toán dài. Nếu các vòng lặp mất một thời gian dài, bạn có thể trải nghiệm sự tăng trưởng không bị ràng buộc của các luồng công nhân do một heuristic để ngăn chặn nạn đói luồng được sử dụng bởi logic tiêm thread của lớp .NET ThreadPool. Điều này heuristic đều đặn tăng số lượng các chủ đề công nhân khi các mục công việc của hồ bơi hiện tại chạy trong thời gian dài.Động lực là để thêm nhiều chủ đề hơn trong trường hợp tất cả mọi thứ trong hồ bơi thread bị chặn. Thật không may, nếu công việc thực sự là tiến hành, nhiều chủ đề có thể không nhất thiết phải là những gì bạn muốn. Khuôn khổ .NET không thể phân biệt giữa hai tình huống này.

Tôi vẫn không biết chi tiết, nhưng tôi nghĩ rằng các thuật toán cơ bản của ThreadPool không lý do tốt cho các mục công việc đang chạy rất lâu, không đáp ứng các chủ đề cho các lần lặp tiếp theo do giới hạn trên không được điều chỉnh thích hợp, do đó có lặp đi lặp lại hàng đợi. Tôi không có quyền truy cập Visual Studio vào máy 8 lõi nơi sự cố có thể tái tạo dễ dàng hơn. Tôi vẫn chưa thể tái tạo vấn đề trong Visual Studio gỡ lỗi trên một máy tính 4 lõi. Điều tra tiếp tục.

Để biết thêm chi tiết, chủ đề "Does the Task Parallel Library (or PLINQ) take other processes into account?" có liên quan cao.

Các vấn đề liên quan