Duy trì một số của chủ đề hoạt động. vòng bỏ phiếu
public class ThreadCounter {
public static final AtomicInteger threadCounter = new AtomicInteger(N);
public static final AtomicInteger queueCounter = new AtomicInteger(0);
public static final Object poisonPill = new Object();
public static volatile boolean cancel = false; // or use a final AomticBoolean instead
}
đề của bạn nên trông giống như sau (tôi giả định rằng bạn đang sử dụng một BlockingQueue
)
while(!ThreadCounter.cancel) {
int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking
if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) {
ThreadCounter.cancel = true;
queue.offer(ThreadCounter.poisonPill);
} else {
Object obj = queue.take();
ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking
ThreadCounter.queueCounter.decrementAndGet();
if(obj == ThreadCounter.poisonPill) {
queue.offer(obj); // send the poison pill back through the queue so the other threads can read it
continue;
}
}
}
Nếu một thread là về để chặn trên BlockingQueue
sau đó nó decrements quầy; nếu tất cả các chủ đề đã chờ đợi trên hàng đợi (nghĩa là counter == 0
), thì chủ đề cuối cùng đặt cancel
thành true, sau đó gửi một loại thuốc độc qua hàng đợi để đánh thức các chủ đề khác; mỗi thread nhìn thấy viên thuốc độc, gửi nó trở lại thông qua hàng đợi để đánh thức các chủ đề còn lại, và sau đó thoát khỏi vòng lặp khi thấy rằng cancel
được đặt thành true.
Edit: Tôi đã gỡ bỏ cuộc đua dữ liệu bằng cách thêm một queueCounter
duy trì việc đếm số lượng đối tượng trong hàng đợi (rõ ràng bạn cũng sẽ cần phải thêm một cuộc gọi queueCounter.incrementAndGet()
các đối tượng bất cứ nơi nào bạn đang thêm vào hàng đợi). Điều này hoạt động như sau: nếu threadCount == 0
, nhưng queueCount != 0
, thì điều này có nghĩa là một chủ đề vừa xóa một mục khỏi hàng đợi nhưng chưa được gọi là threadCount.getAndIncrement
và do đó biến hủy là không được đặt thành true. Điều quan trọng là cuộc gọi threadCount.getAndIncrement
trước cuộc gọi queueCount.getAndDecrement
, nếu không bạn vẫn sẽ có cuộc đua dữ liệu. Nó không quan trọng theo thứ tự bạn gọi là queueCount.getAndIncrement
vì bạn sẽ không xen kẽ điều này với một cuộc gọi đến threadCount.getAndDecrement
(cái thứ hai sẽ được gọi ở cuối vòng lặp, cái trước sẽ được gọi ở đầu vòng lặp).
Lưu ý rằng bạn không thể chỉ sử dụng queueCount
để xác định thời điểm kết thúc quá trình, vì luồng vẫn có thể hoạt động mà chưa đặt bất kỳ dữ liệu nào trong hàng đợi - nói cách khác, queueCount
sẽ bằng 0 được khác 0 khi thread kết thúc quá trình lặp hiện tại của nó.
Thay vì liên tục gửi poisonPill
qua hàng đợi, thay vào đó bạn có thể hủy gửi chủ đề hủy (N-1) poisonPills
qua hàng đợi. Chỉ cần thận trọng nếu bạn sử dụng phương pháp này sử dụng hàng đợi khác, vì một số hàng đợi (ví dụ: Dịch vụ xếp hàng đơn giản của Amazon) có thể trả về nhiều mục tương đương với phương pháp take
của chúng, trong trường hợp này, bạn cần phải gửi liên tục qua poisonPill
để đảm bảo mọi thứ tắt.
Bên cạnh đó, thay vì sử dụng một vòng lặp while(!cancel)
, bạn có thể sử dụng một vòng lặp while(true)
và phá vỡ khi vòng lặp phát hiện một poisonPill
tôi đoán hủy nên không ổn định? – marcorossi
điều gì sẽ xảy ra khi một chuỗi nằm giữa lệnh gọi queue.take() và incrementAndGet() và luồng khác đang đánh giá điều kiện count == 0 và queue.isEmpty()? – marcorossi
@marcorossi yeah 'cancel' nên dễ bay hơi để thay đổi hiển thị trên tất cả các chuỗi. Ví dụ, tất cả các luồng xem ghi gần đây nhất cho trường 'cancel', nó đảm bảo đọc và ghi là nguyên tử. Đã chỉnh sửa câu trả lời. – adamjmarkham