7

Tôi đang vật lộn với cách tốt nhất để triển khai đường ống xử lý của mình.hàng đợi của nhà sản xuất/người tiêu dùng

Nhà sản xuất thức ăn chăn nuôi của tôi hoạt động với BlockingQueue. Về phía người tiêu dùng, tôi thăm dò ý kiến ​​hàng đợi, bọc những gì tôi nhận được trong một nhiệm vụ Runnable, và gửi nó đến một ExecutorService.

while (!isStopping()) 
{ 
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS); 
    if (work == null) 
    { 
     break; 
    } 
    executorService.execute(new Worker(work)); // needs to block if no threads! 
} 

Điều này không lý tưởng; ExecutorService có hàng đợi riêng của mình, tất nhiên, vì vậy những gì thực sự xảy ra là tôi luôn hoàn toàn thoát khỏi hàng đợi công việc của mình và điền vào hàng đợi nhiệm vụ, từ đó làm trống khi các tác vụ hoàn tất.

Tôi nhận ra rằng tôi có thể xếp hàng nhiệm vụ ở cuối nhà sản xuất, nhưng tôi thực sự không muốn làm điều đó - Tôi thích sự gián đoạn/cô lập hàng đợi công việc của tôi là những chuỗi câm; nó thực sự không phải là kinh doanh của nhà sản xuất những gì sẽ xảy ra với họ. Buộc nhà sản xuất xếp hàng một Runnable hoặc Callable phá vỡ một trừu tượng, IMHO.

Nhưng tôi muốn hàng đợi công việc được chia sẻ đại diện cho trạng thái xử lý hiện tại. Tôi muốn có thể chặn các nhà sản xuất nếu người tiêu dùng không theo kịp.

Tôi rất muốn sử dụng Executors, nhưng tôi cảm thấy mình đang chiến đấu với thiết kế của họ. Tôi có thể uống Kool-ade một phần không, hoặc tôi có phải uống nó không? Tôi có bị sai lầm trong việc chống lại các nhiệm vụ xếp hàng không? (Tôi nghi ngờ tôi có thể thiết lập ThreadPoolExecutor để sử dụng hàng đợi 1 nhiệm vụ và ghi đè phương thức thực thi của nó để chặn thay vì từ chối-trên-hàng đợi đầy đủ, nhưng điều đó cảm thấy tổng.)

Gợi ý?

Trả lời

14

Tôi muốn hàng đợi công việc được chia sẻ tới đại diện cho trạng thái xử lý hiện tại .

Hãy thử sử dụng một chia sẻ BlockingQueue và có một nhóm chủ đề công việc lấy các mục công việc ngoài Hàng đợi.

Tôi muốn có thể chặn các nhà sản xuất nếu người tiêu dùng không lưu giữ.

Cả ArrayBlockingQueueLinkedBlockingQueue hỗ trợ giáp hàng đợi như vậy mà họ sẽ chặn trên đặt khi đầy đủ. Sử dụng phương pháp chặn put() đảm bảo rằng nhà sản xuất bị chặn nếu hàng đợi đầy.

Đây là một khởi đầu khó khăn. Bạn có thể điều chỉnh số lượng công nhân và kích thước hàng đợi:

public class WorkerTest<T> { 

    private final BlockingQueue<T> workQueue; 
    private final ExecutorService service; 

    public WorkerTest(int numWorkers, int workQueueSize) { 
     workQueue = new LinkedBlockingQueue<T>(workQueueSize); 
     service = Executors.newFixedThreadPool(numWorkers); 

     for (int i=0; i < numWorkers; i++) { 
      service.submit(new Worker<T>(workQueue)); 
     } 
    } 

    public void produce(T item) { 
     try { 
      workQueue.put(item); 
     } catch (InterruptedException ex) { 
      Thread.currentThread().interrupt(); 
     } 
    } 


    private static class Worker<T> implements Runnable { 
     private final BlockingQueue<T> workQueue; 

     public Worker(BlockingQueue<T> workQueue) { 
      this.workQueue = workQueue; 
     } 

     @Override 
     public void run() { 
      while (!Thread.currentThread().isInterrupted()) { 
       try { 
        T item = workQueue.take(); 
        // Process item 
       } catch (InterruptedException ex) { 
        Thread.currentThread().interrupt(); 
        break; 
       } 
      } 
     } 
    } 
} 
+0

Cảm ơn; triển khai trước đây của tôi rất giống như thế này, mặc dù nó chỉ sử dụng ThreadFactory - một khi bạn giảm nó thành một tập hợp cố định các luồng mà tất cả cố gắng thoát khỏi hàng đợi công việc, có ít điểm trong việc sử dụng ExecutorService nữa. Tôi đã chuyển sang ExecutorService để tận dụng lợi thế của một nhóm luồng có thể điều chỉnh được nhiều hơn, với ngữ nghĩa "tìm chuỗi công nhân hiện có nếu có, tạo một cái nếu cần thiết, giết chúng nếu chúng không hoạt động." –

+0

Executors.newCachedThreadPool() sẽ làm một cái gì đó tương tự như vậy. Bạn cũng có thể thực sự tinh chỉnh chính sách nhóm trên chính ThreadPoolExecutor. Bạn đang làm gì sau đó? – Kevin

+0

Đó là ý tưởng ... nó có thể được điều chỉnh chính xác theo cách tôi thích, nếu tôi sẵn sàng sử dụng hàng đợi công việc nhiệm vụ của nó. Những gì tôi thực sự muốn là để khắc ra các hồ bơi smarts hồ bơi từ các nhà điều hành và thực hiện riêng của khách hàng hồ bơi thread của tôi, nhưng nó không thực sự thiết lập cho điều đó. –

0

Trực tiếp thay vì bắt đầu chuỗi mới. Kết hợp điều này với một hàng đợi chặn với kích thước tối đa và tôi nghĩ rằng bạn sẽ có được những gì bạn muốn. Người tiêu dùng của bạn trở thành một công nhân đang thực hiện các nhiệm vụ nội tuyến dựa trên các mục công việc trên hàng đợi. Họ sẽ chỉ dequeue mặt hàng nhanh như họ xử lý chúng để sản xuất của bạn khi người tiêu dùng của bạn ngừng tiêu thụ.

+0

Điều đó sẽ chỉ cung cấp cho một công nhân và tôi muốn điều chỉnh nó để tối đa hóa quá trình xử lý cho một cấu hình hệ thống nhất định. Công việc xử lý sẽ kéo dài giữa một phần nhỏ của một giây và hàng chục giây, và là một kết hợp của công việc ràng buộc I/O-ràng buộc và CPU. –

+0

Tôi giả định nhiều người tiêu dùng liên kết với một hoặc nhiều nhà sản xuất. Nếu bạn chỉ có một người tiêu dùng duy nhất, thì tại sao không có nhà sản xuất đổ công việc trực tiếp vào người điều hành? –

1

"tìm chuỗi công nhân hiện có nếu tồn tại, tạo một công cụ nếu cần, giết chúng nếu chúng không hoạt động".

Quản lý tất cả các trạng thái công nhân đó là không cần thiết vì nó nguy hiểm.Tôi sẽ tạo một chuỗi màn hình liên tục chạy ở chế độ nền, nhiệm vụ duy nhất là lấp đầy hàng đợi và sinh ra người tiêu dùng ... tại sao không làm chủ đề công nhân daemon để họ chết ngay khi hoàn thành? Nếu bạn đính kèm tất cả vào một ThreadGroup, bạn có thể tự động kích thước lại nhóm ... ví dụ:

**for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ { 
     spawnDaemonWorkers(queue.poll()); 
    }** 
Các vấn đề liên quan