2013-05-16 36 views
5

Tôi có N công nhân chia sẻ hàng đợi các yếu tố để tính toán. Tại mỗi lần lặp lại, mỗi nhân viên loại bỏ một phần tử khỏi hàng đợi và có thể tạo ra nhiều phần tử hơn để tính toán, mà sẽ được đặt trong cùng một hàng đợi. Về cơ bản, mỗi nhà sản xuất cũng là một người tiêu dùng. Việc tính toán kết thúc khi không có phần tử nào trong hàng đợi và tất cả các công nhân đã hoàn thành việc tính toán phần tử hiện tại (do đó không thể tạo ra các phần tử tính toán nữa). Tôi muốn tránh một điều phối viên/điều phối viên, vì vậy các công nhân nên phối hợp. Mô hình nào tốt nhất để cho phép một nhân viên tìm hiểu xem các điều kiện dừng có hợp lệ không, và do đó ngăn chặn việc tính toán thay mặt cho những người khác?Người tiêu dùng sản xuất Java với điều kiện dừng

Ví dụ, nếu tất cả các chủ đề chỉ có thể làm chu kỳ này, khi các yếu tố đều tính toán, nó sẽ cho kết quả trong tất cả các chủ đề bị chặn vĩnh viễn:

while (true) { 
    element = queue.poll(); 
    newElements[] = compute(element); 
    if (newElements.length > 0) { 
     queue.addAll(newElements); 
    } 
} 

Trả lời

6

Duy trì một số của chủ đề hoạt động. vòng bỏ phiếu

public class ThreadCounter { 
    public static final AtomicInteger threadCounter = new AtomicInteger(N); 
    public static final AtomicInteger queueCounter = new AtomicInteger(0); 
    public static final Object poisonPill = new Object(); 
    public static volatile boolean cancel = false; // or use a final AomticBoolean instead 
} 

đề của bạn nên trông giống như sau (tôi giả định rằng bạn đang sử dụng một BlockingQueue)

while(!ThreadCounter.cancel) { 
    int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking 
    if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) { 
     ThreadCounter.cancel = true; 
     queue.offer(ThreadCounter.poisonPill); 
    } else { 
     Object obj = queue.take(); 
     ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking 
     ThreadCounter.queueCounter.decrementAndGet(); 
     if(obj == ThreadCounter.poisonPill) { 
      queue.offer(obj); // send the poison pill back through the queue so the other threads can read it 
      continue; 
     } 
    } 
} 

Nếu một thread là về để chặn trên BlockingQueue sau đó nó decrements quầy; nếu tất cả các chủ đề đã chờ đợi trên hàng đợi (nghĩa là counter == 0), thì chủ đề cuối cùng đặt cancel thành true, sau đó gửi một loại thuốc độc qua hàng đợi để đánh thức các chủ đề khác; mỗi thread nhìn thấy viên thuốc độc, gửi nó trở lại thông qua hàng đợi để đánh thức các chủ đề còn lại, và sau đó thoát khỏi vòng lặp khi thấy rằng cancel được đặt thành true.

Edit: Tôi đã gỡ bỏ cuộc đua dữ liệu bằng cách thêm một queueCounter duy trì việc đếm số lượng đối tượng trong hàng đợi (rõ ràng bạn cũng sẽ cần phải thêm một cuộc gọi queueCounter.incrementAndGet() các đối tượng bất cứ nơi nào bạn đang thêm vào hàng đợi). Điều này hoạt động như sau: nếu threadCount == 0, nhưng queueCount != 0, thì điều này có nghĩa là một chủ đề vừa xóa một mục khỏi hàng đợi nhưng chưa được gọi là threadCount.getAndIncrement và do đó biến hủy là không được đặt thành true. Điều quan trọng là cuộc gọi threadCount.getAndIncrement trước cuộc gọi queueCount.getAndDecrement, nếu không bạn vẫn sẽ có cuộc đua dữ liệu. Nó không quan trọng theo thứ tự bạn gọi là queueCount.getAndIncrement vì bạn sẽ không xen kẽ điều này với một cuộc gọi đến threadCount.getAndDecrement (cái thứ hai sẽ được gọi ở cuối vòng lặp, cái trước sẽ được gọi ở đầu vòng lặp).

Lưu ý rằng bạn không thể chỉ sử dụng queueCount để xác định thời điểm kết thúc quá trình, vì luồng vẫn có thể hoạt động mà chưa đặt bất kỳ dữ liệu nào trong hàng đợi - nói cách khác, queueCount sẽ bằng 0 được khác 0 khi thread kết thúc quá trình lặp hiện tại của nó.

Thay vì liên tục gửi poisonPill qua hàng đợi, thay vào đó bạn có thể hủy gửi chủ đề hủy (N-1) poisonPills qua hàng đợi. Chỉ cần thận trọng nếu bạn sử dụng phương pháp này sử dụng hàng đợi khác, vì một số hàng đợi (ví dụ: Dịch vụ xếp hàng đơn giản của Amazon) có thể trả về nhiều mục tương đương với phương pháp take của chúng, trong trường hợp này, bạn cần phải gửi liên tục qua poisonPill để đảm bảo mọi thứ tắt.

Bên cạnh đó, thay vì sử dụng một vòng lặp while(!cancel), bạn có thể sử dụng một vòng lặp while(true) và phá vỡ khi vòng lặp phát hiện một poisonPill

+0

tôi đoán hủy nên không ổn định? – marcorossi

+0

điều gì sẽ xảy ra khi một chuỗi nằm giữa lệnh gọi queue.take() và incrementAndGet() và luồng khác đang đánh giá điều kiện count == 0 và queue.isEmpty()? – marcorossi

+0

@marcorossi yeah 'cancel' nên dễ bay hơi để thay đổi hiển thị trên tất cả các chuỗi. Ví dụ, tất cả các luồng xem ghi gần đây nhất cho trường 'cancel', nó đảm bảo đọc và ghi là nguyên tử. Đã chỉnh sửa câu trả lời. – adamjmarkham

Các vấn đề liên quan