Tôi có một quy trình cần phải tính nhiều tác vụ nhỏ song song, và sau đó xử lý kết quả theo thứ tự tự động của các tác vụ. Để làm điều này, tôi có các thiết lập sau:Cố định chủ đề Hồ bơi chủ đề chặn, khi đủ công việc gửi
Một ExecutorService đơn giản, và một hàng đợi chặn tôi sẽ sử dụng để giữ các đối tượng trong tương lai trả về khi một Callable được gửi đến người thi hành:
ExecutorService exec = Executors.newFixedThreadPool(15);
LinkedBlockingQueue<Future<MyTask>> futures = new LinkedBlockingQueue<Future<MyTask>>(15 * 64);
Một số mã gỡ lỗi để đếm số lượng nộp và số nhiệm vụ xử lý và viết chúng ra một cách thường xuyên (lưu ý rằng processed
được tăng lên ở phần cuối của mã nhiệm vụ riêng của mình):
AtomicLong processed = new AtomicLong(0);
AtomicLong submitted = new AtomicLong(0);
Timer statusTimer = new Timer();
statusTimer.schedule(new TimerTask() {
@Override
public void run() {
l.info("Futures: " + futures.size() + "; Submitted: " + submitted.get() + "; Processed: " + processed.get() + "; Diff: " + (submitted.get() - processed.get())));
}
}, 60 * 1000, 60 * 1000);
một chủ đề mà có nhiệm vụ từ một hàng đợi (đó là actuall ya máy phát điện) và nộp cho họ để người thi hành, đưa tương lai dẫn đến futures
hàng đợi (đây là cách tôi chắc chắn rằng tôi không nộp quá nhiều nhiệm vụ một chạy ra khỏi bộ nhớ):
Thread submitThread = new Thread(() ->
{
MyTask task;
try {
while ((task = taskQueue.poll()) != null) {
futures.put(exec.submit(task));
submitted.incrementAndGet();
}
} catch (Exception e) {l .error("Unexpected Exception", e);}
}, "SubmitTasks");
submitThread.start();
Các thread hiện hành sau đó take
-s nhiệm vụ kết thúc các futures
hàng đợi và xử lý kết quả:
while (!futures.isEmpty() || submitThread.isAlive()) {
MyTask task = futures.take().get();
//process result
}
Khi tôi chạy trên một máy chủ với 8 lõi (lưu ý rằng mã hiện đang sử dụng 15 bài) các đỉnh núi sử dụng CPU ở mức khoảng 60% chỉ . Tôi thấy kết xuất debug của tôi như thế này:
INFO : Futures: 960; Submitted: 1709710114; Processed: 1709709167; Diff: 947
INFO : Futures: 945; Submitted: 1717159751; Processed: 1717158862; Diff: 889
INFO : Futures: 868; Submitted: 1724597808; Processed: 1724596954; Diff: 853
INFO : Futures: 940; Submitted: 1732030120; Processed: 1732029252; Diff: 871
INFO : Futures: 960; Submitted: 1739538576; Processed: 1739537758; Diff: 818
INFO : Futures: 960; Submitted: 1746965761; Processed: 1746964811; Diff: 950
Một bãi chứa thread cho thấy rằng nhiều thread hồ bơi đang chặn như thế này:
"pool-1-thread-14" #30 prio=5 os_prio=0 tid=0x00007f25c802c800 nid=0x10b2 waiting on condition [0x00007f26151d5000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00007f2fbb0001b0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:897)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
giải thích của tôi về đầu ra debug là tại bất kỳ điểm nào trong thời gian đó, tôi có ít nhất vài trăm tác vụ đã được gửi cho dịch vụ thi hành, nhưng chưa được xử lý (tôi cũng có thể xác nhận theo dõi ngăn xếp rằng luồng SubmitTasks bị chặn trên LinkedBlockingQueue.put
). Tuy nhiên, dấu vết stack (và số liệu thống kê sử dụng máy chủ) đang cho tôi thấy rằng dịch vụ Executor bị chặn trên LinkedBlockingQueue.take (những gì tôi giả định là hàng đợi nhiệm vụ nội bộ bị trống).
Tôi đang đọc gì?