2012-06-28 26 views
11

Tôi cần triển khai một nhóm luồng trong Java (java.util.concurrent) có số lượng chủ đề ở một giá trị tối thiểu khi không hoạt động, tăng lên đến giới hạn trên (nhưng không bao giờ hơn nữa) khi công việc được gửi vào nó nhanh hơn họ hoàn thành thực hiện, và co lại trở lại giới hạn dưới khi tất cả các công việc được thực hiện và không có thêm công việc được gửi.Tạo hồ bơi chủ đề động (đang phát triển/thu hẹp)

Bạn sẽ triển khai điều gì đó như thế? Tôi tưởng tượng rằng đây sẽ là một kịch bản sử dụng khá phổ biến, nhưng dường như các phương pháp nhà máy java.util.concurrent.Executors chỉ có thể tạo các hồ bơi và hồ bơi có kích thước cố định phát triển không thành công khi nhiều công việc được gửi. Lớp ThreadPoolExecutor cung cấp các thông số corePoolSizemaximumPoolSize, nhưng tài liệu của nó dường như ngụ ý rằng cách duy nhất để có nhiều hơn corePoolSize chủ đề cùng một lúc là sử dụng hàng đợi công việc bị ràng buộc, trong trường hợp đó, nếu bạn đã đạt đến chủ đề maximumPoolSize, bạn sẽ bị từ chối công việc mà bạn phải đối phó với chính mình? Tôi đã đưa ra điều này:

//pool creation 
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS, 
    new ArrayBlockingQueue<Runnable>(minSize)); 
... 

//submitting jobs 
for (Runnable job : ...) { 
    while (true) { 
     try { 
      pool.submit(job); 
      System.out.println("Job " + job + ": submitted"); 
      break; 
     } catch (RejectedExecutionException e) { 
      // maxSize jobs executing concurrently atm.; re-submit new job after short wait 
      System.out.println("Job " + job + ": rejected..."); 
      try { 
       Thread.sleep(300); 
      } catch (InterruptedException e1) { 
      } 
     } 
    } 
} 

Tôi có thấy gì không? Có cách nào tốt hơn để làm điều này? Ngoài ra, tùy thuộc vào yêu cầu của một người, nó có thể là vấn đề mà các mã trên sẽ không kết thúc cho đến khi ít nhất (tôi nghĩ) (total number of jobs) - maxSize công việc đã hoàn thành. Vì vậy, nếu bạn muốn có thể gửi một số lượng công việc tùy ý vào hồ bơi và tiến hành ngay lập tức mà không cần đợi bất kỳ công việc nào hoàn thành, tôi không thấy cách bạn có thể làm điều đó mà không có chuỗi "công việc tổng hợp" chuyên dụng hàng đợi không được yêu cầu để giữ tất cả các công việc đã gửi. AFAICS, nếu bạn đang sử dụng một hàng đợi không bị ràng buộc cho chính ThreadPoolExecutor, số lượng chuỗi của nó sẽ không bao giờ phát triển vượt quá corePoolSize.

+3

Tôi phải thừa nhận, tôi không thấy được tính hữu ích của một luồng có kích thước động. Số lượng bộ vi xử lý của bạn trên bảng của bạn có thay đổi trong thời gian hoạt động của ứng dụng của bạn không? – corsiKa

+3

Tại sao 'newCachedThreadPool' không phù hợp với hoàn cảnh của bạn? Nó tự động giết chết các chủ đề không được sử dụng nữa. – Tudor

+0

Điều gì sẽ xảy ra nếu chủ đề nhàn rỗi của bạn không chết? Giả sử bạn có một hồ bơi kích thước cố định có kích thước tối đa mọi lúc? Chuyện gì sẽ xảy ra? –

Trả lời

4

Một mẹo có thể giúp bạn là chỉ định RejectedExecutionHandler sử dụng cùng một chuỗi để gửi công việc vào hàng đợi chặn. Điều đó sẽ chặn các thread hiện tại và loại bỏ sự cần thiết cho một số loại vòng lặp.

Xem câu trả lời của tôi ở đây:

How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

Dưới đây là bộ xử lý từ chối sao chép từ câu trả lời đó.

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200); 
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads, 
     0L, TimeUnit.MILLISECONDS, queue); 
// by default (unfortunately) the ThreadPoolExecutor will call the rejected 
// handler when you submit the 201st job, to have it block you do: 
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
     // this will block if the queue is full 
     executor.getQueue().put(r); 
    } 
}); 

Sau đó, bạn sẽ có thể tận dụng các lõi/thread max đếm miễn là bạn nhận ra rằng hàng đợi chặn giáp mà bạn sử dụng đầu tiên đầy lên trước bất kỳ chủ đề được tạo ra trên các chủ đề cốt lõi. Vì vậy, nếu bạn có 10 chủ đề cốt lõi và bạn muốn công việc thứ 11 để bắt đầu chủ đề thứ 11, bạn sẽ cần phải có một hàng đợi chặn với kích thước 0 không may (có thể là SynchronousQueue). Tôi cảm thấy rằng đây là một hạn chế thực sự trong các lớp học tuyệt vời khác ExecutorService.

1

Đặt maximumPoolSize thành Integer.MAX_VALUE. Nếu bạn đã có hơn 2 tỷ chủ đề ... tốt, chúc may mắn với điều đó.

Dù sao, Javadoc của ThreadPoolExecutor trạng thái:

Bằng cách đặt maximumPoolSize đến một giá trị cơ bản vô biên như Integer.MAX_VALUE, bạn cho phép các hồ bơi để chứa một số tùy ý các nhiệm vụ đồng thời. Thông thường nhất, kích thước hồ bơi lõi và tối đa chỉ được thiết lập khi xây dựng, nhưng chúng cũng có thể được thay đổi động bằng cách sử dụng setCorePoolSize (int) và setMaximumPoolSize (int).

Với hàng đợi công việc không bị chặn tương tự như LinkedBlockingQueue, điều này sẽ có dung lượng lớn tùy ý.

+0

Liệu người chăm sóc downvoter có giải thích được không? –

+0

Cảm ơn cũng tham khảo http://stackoverflow.com/questions/28567238/threadpoolexecutor-does-not-shrink-properly/40384042#40384042 thu hẹp này nhấn các vấn đề khác được giải quyết trong câu hỏi khác – Vahid

+0

ông muốn có nó bị chặn, không bị chặn. .. – Xerus

8

Khi tăng và thu hẹp đi kèm với chuỗi, chỉ có một tên xuất hiện trong tâm trí của tôi: CachedThreadPool từ gói java.util.concurrent.

ExecutorService executor = Executors.newCachedThreadPool(); 

CachedThreadPool() có thể tái sử dụng thread, cũng như tạo chủ đề mới khi cần thiết. Và có, nếu một chủ đề không hoạt động trong 60 giây, CachedThreadPool sẽ xóa nó. Vì vậy, điều này là khá nhẹ - phát triển và thu hẹp trong lời nói của bạn!

+6

Phải nhưng không bị ràng buộc. – Gray

+0

Bạn có thể sử dụng ThreadPoolExecutor cơ bản và thiết lập maximumPoolSize theo cách thủ công hoặc ngay cả khi chạy –

Các vấn đề liên quan