2012-04-03 24 views
5

Tôi muốn sử dụng một CompletionService để xử lý các kết quả từ một chuỗi các chủ đề khi chúng được hoàn thành. Tôi có dịch vụ trong một vòng lặp để đưa các đối tượng trong tương lai mà nó cung cấp khi chúng có sẵn, nhưng tôi không biết cách tốt nhất để xác định khi nào tất cả các chủ đề đã hoàn thành (và do đó thoát khỏi vòng lặp):Làm thế nào để biết khi nào một dịch vụ CompletionService kết thúc phân phối kết quả?

import java.util.concurrent.Callable; 
import java.util.concurrent.CompletionService; 
import java.util.concurrent.ExecutorCompletionService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.ThreadPoolExecutor; 

public class Bar { 

    final static int MAX_THREADS = 4; 
    final static int TOTAL_THREADS = 20; 

    public static void main(String[] args) throws Exception{ 

     final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS); 
     final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool); 

     for (int i=0; i<TOTAL_THREADS; i++){ 
      service.submit(new MyCallable(i)); 
     } 

     int finished = 0; 
     Future<Integer> future = null; 
     do{ 
      future = service.take(); 
      int result = future.get(); 
      System.out.println(" took: " + result); 
      finished++;    

     }while(finished < TOTAL_THREADS); 

     System.out.println("Shutting down"); 
     threadPool.shutdown(); 
    } 


    public static class MyCallable implements Callable<Integer>{ 

     final int id; 

     public MyCallable(int id){ 
      this.id = id; 
      System.out.println("Submitting: " + id); 
     } 

     @Override 
     public Integer call() throws Exception { 
      Thread.sleep(1000); 
      System.out.println("finished: " + id); 
      return id; 
     } 
    } 
} 

Tôi đã thử kiểm tra trạng thái của ThreadPoolExecutor, nhưng tôi biết phương thức getCompletedTaskCount và getTaskCount chỉ xấp xỉ và không nên dựa vào. Có cách nào tốt hơn để đảm bảo rằng tôi đã lấy tất cả các tương lai từ CompletionService hơn là tự đếm chúng?


Edit: Cả hai liên kết mà Nobeh cung cấp, và gợi ý rằng this link đếm số lượng các nhiệm vụ gửi, sau đó gọi mất() mà nhiều lần, là con đường để đi. Tôi chỉ ngạc nhiên là không có cách nào để yêu cầu CompletionService hoặc Executor của nó những gì còn lại để được trả lại.

Trả lời

3

Trả lời những câu hỏi này cung cấp cho bạn câu trả lời?

  • Làm công việc không đồng bộ của bạn tạo các tác vụ khác được gửi đến CompletionService?
  • service đối tượng duy nhất có nhiệm vụ xử lý các tác vụ được tạo trong ứng dụng của bạn?

Dựa trên reference documentation, CompletionService hành vi trên một/cách tiếp cận sản xuất tiêu dùng và lợi dụng một nội Executor. Vì vậy, miễn là, bạn sản xuất các nhiệm vụ ở một nơi và tiêu thụ chúng ở một nơi khác, CompletionService.take() sẽ biểu thị nếu có bất kỳ kết quả nào khác để đưa ra.

Tôi tin rằng this question cũng sẽ giúp bạn.

+0

Cảm ơn, không. Dường như họ cũng chỉ lặp qua số lượng chủ đề, trong "của họ (cho (int tasksHandled = 0; tasksHandled

+1

Ví dụ trong API sử dụng cùng một phương pháp thực hiện take() n lần liên tiếp. http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorCompletionService.html –

+0

Nếu lấy kết quả từ CompletionService trong một luồng khác với các tác vụ đã được gửi đến Exector, chuỗi đó có an toàn không? – raffian

3

Xem http://www.javaspecialists.eu/archive/Issue214.html để có đề xuất phù hợp về cách mở rộng ExecutorCompletionService để thực hiện những gì bạn đang tìm kiếm. Tôi đã dán mã có liên quan bên dưới để thuận tiện cho bạn. Tác giả cũng đề nghị làm cho dịch vụ thực thi Iterable, mà tôi nghĩ đó sẽ là một ý tưởng hay.

FWIW, tôi đồng ý với bạn rằng điều này thực sự phải là một phần của việc triển khai chuẩn, nhưng than ôi, không phải vậy.

import java.util.concurrent.*; 
import java.util.concurrent.atomic.*; 

public class CountingCompletionService<V> extends ExecutorCompletionService<V> { 
    private final AtomicLong submittedTasks = new AtomicLong(); 
    private final AtomicLong completedTasks = new AtomicLong(); 

    public CountingCompletionService(Executor executor) { 
    super(executor); 
    } 

    public CountingCompletionService(
     Executor executor, BlockingQueue<Future<V>> queue) { 
    super(executor, queue); 
    } 

    public Future<V> submit(Callable<V> task) { 
    Future<V> future = super.submit(task); 
    submittedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> submit(Runnable task, V result) { 
    Future<V> future = super.submit(task, result); 
    submittedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> take() throws InterruptedException { 
    Future<V> future = super.take(); 
    completedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> poll() { 
    Future<V> future = super.poll(); 
    if (future != null) completedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> poll(long timeout, TimeUnit unit) 
     throws InterruptedException { 
    Future<V> future = super.poll(timeout, unit); 
    if (future != null) completedTasks.incrementAndGet(); 
    return future; 
    } 

    public long getNumberOfCompletedTasks() { 
    return completedTasks.get(); 
    } 

    public long getNumberOfSubmittedTasks() { 
    return submittedTasks.get(); 
    } 

    public boolean hasUncompletedTasks() { 
    return completedTasks.get() < submittedTasks.get(); 
    } 
} 
Các vấn đề liên quan