2015-12-10 16 views
7

Tôi có một quy trình cần phải tính nhiều tác vụ nhỏ song song, và sau đó xử lý kết quả theo thứ tự tự động của các tác vụ. Để làm điều này, tôi có các thiết lập sau:Cố định chủ đề Hồ bơi chủ đề chặn, khi đủ công việc gửi

Một ExecutorService đơn giản, và một hàng đợi chặn tôi sẽ sử dụng để giữ các đối tượng trong tương lai trả về khi một Callable được gửi đến người thi hành:

ExecutorService exec = Executors.newFixedThreadPool(15); 
LinkedBlockingQueue<Future<MyTask>> futures = new LinkedBlockingQueue<Future<MyTask>>(15 * 64); 

Một số mã gỡ lỗi để đếm số lượng nộp và số nhiệm vụ xử lý và viết chúng ra một cách thường xuyên (lưu ý rằng processed được tăng lên ở phần cuối của mã nhiệm vụ riêng của mình):

AtomicLong processed = new AtomicLong(0); 
AtomicLong submitted = new AtomicLong(0); 

Timer statusTimer = new Timer(); 
statusTimer.schedule(new TimerTask() { 
     @Override 
     public void run() { 
     l.info("Futures: " + futures.size() + "; Submitted: " + submitted.get() + "; Processed: " + processed.get() + "; Diff: " + (submitted.get() - processed.get()))); 
     }    
}, 60 * 1000, 60 * 1000); 

một chủ đề mà có nhiệm vụ từ một hàng đợi (đó là actuall ya máy phát điện) và nộp cho họ để người thi hành, đưa tương lai dẫn đến futures hàng đợi (đây là cách tôi chắc chắn rằng tôi không nộp quá nhiều nhiệm vụ một chạy ra khỏi bộ nhớ):

Thread submitThread = new Thread(() -> 
{ 
    MyTask task; 
    try { 
     while ((task = taskQueue.poll()) != null) { 
      futures.put(exec.submit(task)); 
      submitted.incrementAndGet(); 
     } 
    } catch (Exception e) {l .error("Unexpected Exception", e);} 
}, "SubmitTasks"); 
submitThread.start(); 

Các thread hiện hành sau đó take -s nhiệm vụ kết thúc các futures hàng đợi và xử lý kết quả:

while (!futures.isEmpty() || submitThread.isAlive()) { 
    MyTask task = futures.take().get(); 
    //process result 
} 

Khi tôi chạy trên một máy chủ với 8 lõi (lưu ý rằng mã hiện đang sử dụng 15 bài) các đỉnh núi sử dụng CPU ở mức khoảng 60% chỉ . Tôi thấy kết xuất debug của tôi như thế này:

INFO : Futures: 960; Submitted: 1709710114; Processed: 1709709167; Diff: 947 
INFO : Futures: 945; Submitted: 1717159751; Processed: 1717158862; Diff: 889 
INFO : Futures: 868; Submitted: 1724597808; Processed: 1724596954; Diff: 853 
INFO : Futures: 940; Submitted: 1732030120; Processed: 1732029252; Diff: 871 
INFO : Futures: 960; Submitted: 1739538576; Processed: 1739537758; Diff: 818 
INFO : Futures: 960; Submitted: 1746965761; Processed: 1746964811; Diff: 950 

Một bãi chứa thread cho thấy rằng nhiều thread hồ bơi đang chặn như thế này:

"pool-1-thread-14" #30 prio=5 os_prio=0 tid=0x00007f25c802c800 nid=0x10b2 waiting on condition [0x00007f26151d5000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for <0x00007f2fbb0001b0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) 
     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:897) 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222) 
     at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) 
     at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439) 
     at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

giải thích của tôi về đầu ra debug là tại bất kỳ điểm nào trong thời gian đó, tôi có ít nhất vài trăm tác vụ đã được gửi cho dịch vụ thi hành, nhưng chưa được xử lý (tôi cũng có thể xác nhận theo dõi ngăn xếp rằng luồng SubmitTasks bị chặn trên LinkedBlockingQueue.put). Tuy nhiên, dấu vết stack (và số liệu thống kê sử dụng máy chủ) đang cho tôi thấy rằng dịch vụ Executor bị chặn trên LinkedBlockingQueue.take (những gì tôi giả định là hàng đợi nhiệm vụ nội bộ bị trống).

Tôi đang đọc gì?

Trả lời

0

Các chủ đề liên quan đến BlockingQueue s luôn phức tạp. Chỉ cần nhìn vào mã của bạn mà không bao giờ chạy với quy mô bạn làm. Tôi có một số gợi ý. Đó là lời khuyên của nhiều chuyên gia trong ngành như Jessica Kerr rằng bạn sẽ không bao giờ chặn mãi mãi. Những gì bạn có thể làm là sử dụng các phương pháp với timeouts trong LinkedBlockingQueue.

Thread submitThread = new Thread(() -> 
{ 
    MyTask task; 
    try { 
     while ((task = taskQueue.peek()) != null) { 
      boolean success = futures.offer(exec.submit(task), 1000, TimeUnit.MILLISECONDS); 
      if(success) { 
       submitted.incrementAndGet(); 
       taskQueue.remove(task); 
      } 
     } 
    } catch (Exception e) {l .error("Unexpected Exception", e);} 
}, "SubmitTasks"); 
submitThread.start(); 

Và tại đây cũng có.

while (!futures.isEmpty() || submitThread.isAlive()) { 
    Future<MyTask> f = futures.poll(1000, TimeUnit.MILLISECONDS); 
    if(f != null) { 
     MyTask task = f.get(); 
    } 
    //process result 
} 

Xem video này bởi Jessica Kerr trên Concurrency tools in JVM

Các vấn đề liên quan