2015-03-26 17 views
16

Tôi đang cố gắng hiểu cách Java FixedThreadPool hoạt động trong thực tế, nhưng các tài liệu không trả lời câu hỏi của tôi.Hiểu Java FixedThreadPool

Giả sử một kịch bản đơn giản như:

ExecutorService ES= Executors.newFixedThreadPool(3); 
List<Future> FL; 
for(int i=1;i<=200;i++){ 
    FL.add(ES.submit(new Task())); 
} 
ES.shutdown(); 

nơi Task là một Callable đó xây dựng một số tài nguyên, sử dụng chúng, và trả về một số đầu ra.

Câu hỏi của tôi: số lượng Task có trong bộ nhớ khi hoàn thành vòng lặp for không? Nói cách khác: sẽ chỉ có 3 Task tại một thời điểm xây dựng nguồn lực của họ, hoặc tất cả chúng được tạo trước, như vậy, sau .submit Tôi có 200 Task (và tài nguyên của họ) đang chờ để thực thi?

Lưu ý: việc xây dựng tài nguyên diễn ra trong hàm tạo Task, không theo phương thức call().

Trong javadoc (cảm thấy tự do để bỏ qua sau): những gì confuses me là lời giải thích sau đây trong các tài liệu Java

Tạo ra một hồ bơi thread mà tái sử dụng một số cố định của chủ đề hoạt động ra một hàng đợi không được chia sẻ được chia sẻ. Tại bất kỳ thời điểm nào, tại hầu hết các chủ đề nThreads sẽ là các nhiệm vụ xử lý hoạt động.

Tôi cho rằng điều này có nghĩa là, trong ví dụ của tôi, tất cả 200 Tác vụ nằm trong hàng đợi, nhưng chỉ có 3 trong số đó được thực thi bất kỳ lúc nào.

Mọi trợ giúp đều được đánh giá cao.

+0

Bạn ngay sau vòng lặp bạn có 200 trường hợp được tạo của 'Task' trong hàng đợi của nhóm, thực thi 3 tác vụ cùng một lúc. – alex2410

+0

Câu hỏi của bạn không thể trả lời được, bởi vì điều này phụ thuộc rất lớn vào thời điểm và khi nào người thu gom rác đá vào để thu thập các công việc đã hoàn thành. – SpaceTrucker

+0

fyi 'newFixedThreadPool (nThreads)' chỉ là phương thức convience cho 'new ThreadPoolExecutor (nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue ());' – for3st

Trả lời

3

Tất cả 200 Tác vụ được tạo và sử dụng tài nguyên và tất cả chúng đều nằm trong hàng đợi.

Nhóm luồng, chỉ gọi từng phương thức run()/call() của chúng, khi một chuỗi miễn phí có sẵn để thực thi.

10

Mã của bạn là tương đương với

for (int i = 1; i <= 200; i++){ 
    Task t = new Task(); 
    FL.add(ES.submit(t)); 
} 

Và sau khi vòng lặp for, các nhà xây dựng của công tác đã như vậy được gọi là 200 lần, và mã nó chứa đã như vậy được thực hiện 200 lần. Cho dù tác vụ được gửi đến một người thi hành hay không là không liên quan: bạn đang gọi một hàm tạo 200 lần trong một vòng lặp và sau khi mỗi tác vụ đã được xây dựng, nó được gửi đến người thi hành. Người thi hành không phải là người gọi hàm khởi tạo nhiệm vụ.

+0

Vì vậy, nếu tôi hiểu chính xác, tôi có tất cả 200 Tác vụ trong bộ nhớ.Những gì thực sự được thực hiện song song là thực hiện phương thức 'call()' của Task (đó là một 'Callable'). Do đó, tôi sẽ có 3 trường hợp song song của phương thức 'call()' (và các đối tượng và tài nguyên mà nó tạo ra). Có phải tôi đang trên đường ray bên phải không? – ZzKr

+1

3 luồng trong hồ bơi thực sự chạy đồng thời và gọi phương thức 'call()' của các tác vụ đã gửi. Tuy nhiên, các phương thức không được khởi tạo, vì vậy hãy nói rằng bạn có "3 trường hợp của phương thức' call() 'là không chính xác. –

+0

's/instances/invocations /' –

6

Nhiệm vụ sẽ bị xóa từng người một khỏi hàng đợi, do đó, khi tiến hành thực hiện, các Tác vụ sẽ bị xóa và chỉ kết quả của chúng sẽ được lưu trữ trong các đối tượng Tương lai đó.

Vì vậy, về cơ bản trong bộ nhớ:

3 Chủ đề
200 -> 0 công tác
0 -> 200 Future
(với tất cả các nhiệm vụ thực hiện)

4

Bạn đang tạo 200 đối tượng sử dụng new Task() và những nhiệm vụ được gửi đến người thực thi. Những người thi hành giữ một tham chiếu đến đối tượng Task này. Vì vậy, nếu trong các nhà xây dựng của Task bạn đang xây dựng và nắm giữ các nguồn lực thì tất cả các nhiệm vụ 200 sẽ được tổ chức các nguồn lực.

Nếu có thể, bạn có thể xây dựng và sử dụng tài nguyên theo phương thức gọi Task nếu bạn không muốn 200 trường hợp xây dựng và giữ tài nguyên. Trong trường hợp đó, chỉ 3 Task tại một thời điểm sẽ xây dựng và giữ tài nguyên.

3

Để hiểu điều này, bạn sẽ cần phải xem điều gì đang xảy ra khi bạn gửi nhiệm vụ cho Người thi hành trong một vòng lặp. Trước tiên, chúng ta sẽ chỉ xem xét việc gửi một nhiệm vụ duy nhất cho Người thực thi. Tôi bây giờ sẽ được đề cập đến mã JDK 1.7.0_51 nguồn

Phương pháp tĩnh Executor.newFixedThreadPool phương thức trả về một ThreadPoolExecutor chứa một hàng đợi chặn để giữ nhiệm vụ

public static ExecutorService newFixedThreadPool(int nThreads) { 
     return new ThreadPoolExecutor(nThreads, nThreads, 
             0L, TimeUnit.MILLISECONDS, 
             new LinkedBlockingQueue<Runnable>()); 
    } 

Thời điểm bạn thêm một nhiệm vụ để Executor này, điều này đi vào phương thức gửi ThreadPoolExecutor mở rộng AbstractExecutorService nơi thực hiện phương thức gửi được viết.

public <T> Future<T> submit(Callable<T> task) { 
     if (task == null) throw new NullPointerException(); 
     RunnableFuture<T> ftask = newTaskFor(task); 
     execute(ftask); 
     return ftask; 
    } 

Các phương thức execute là thực hiện cụ thể (có nghĩa là các loại khác nhau của Executor thực hiện nó cách khác nhau)

Bây giờ đến thịt thực sự. Đây là phương thức thực hiện được xác định trong ThreadPoolExecutor. Đặc biệt chú ý tại các ý kiến. Ở đây, một vài thông số cấu hình của ThreadPoolExecutor như corePoolSize có hiệu lực.

public void execute(Runnable command) { 
     if (command == null) 
      throw new NullPointerException(); 
     /* 
     * Proceed in 3 steps: 
     * 
     * 1. If fewer than corePoolSize threads are running, try to 
     * start a new thread with the given command as its first 
     * task. The call to addWorker atomically checks runState and 
     * workerCount, and so prevents false alarms that would add 
     * threads when it shouldn't, by returning false. 
     * 
     * 2. If a task can be successfully queued, then we still need 
     * to double-check whether we should have added a thread 
     * (because existing ones died since last checking) or that 
     * the pool shut down since entry into this method. So we 
     * recheck state and if necessary roll back the enqueuing if 
     * stopped, or start a new thread if there are none. 
     * 
     * 3. If we cannot queue task, then we try to add a new 
     * thread. If it fails, we know we are shut down or saturated 
     * and so reject the task. 
     */ 
     int c = ctl.get(); 
     if (workerCountOf(c) < corePoolSize) { 
      if (addWorker(command, true)) 
       return; 
      c = ctl.get(); 
     } 
     if (isRunning(c) && workQueue.offer(command)) { 
      int recheck = ctl.get(); 
      if (! isRunning(recheck) && remove(command)) 
       reject(command); 
      else if (workerCountOf(recheck) == 0) 
       addWorker(null, false); 
     } 
     else if (!addWorker(command, false)) 
      reject(command); 
    } 
Các vấn đề liên quan