2016-02-24 35 views
5

Có hàng đợi các tác vụ đang chờ xử lý được sử dụng kết hợp với số Executors.newWorkStealingPool() của Java 8 không? Ví dụ: giả sử # số lõi khả dụng là 2 và Executors.newWorkStealingPool() trống vì 2 tác vụ đã chạy. Sau đó, điều gì sẽ xảy ra nếu một nhiệm vụ thứ 3 được gửi đến người thực thi trộm cắp công việc? Có phải hàng đợi không? Và nếu nó là, những giới hạn nếu có trên hàng đợi nói là gì?Trong Java 8, Executors.newWorkStealingPool() cũng cung cấp một hàng đợi nhiệm vụ?

Xin cảm ơn trước.

+1

Tôi không có câu trả lời cụ thể và tôi ngạc nhiên rằng tài liệu này không được ghi nhận tốt hơn. Nhưng ít nhất trong OpenJDK 8, phương thức này tạo ra một ['ForkJoinPool'] (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html), mà không chỉ đơn giản là sử dụng 'BlockingQueue' như các triển khai khác làm ... gây ra rất nhiều tranh chấp, dẫn đến chi phí. Các tác vụ không thể được thực hiện ngay lập tức * vẫn được xếp hàng đợi. Điều này được thảo luận (cùng với giới hạn hàng đợi) trong một câu trả lời khác: http://stackoverflow.com/a/30045601/228171 –

Trả lời

4

Có hàng đợi các tác vụ đang chờ xử dụng được sử dụng cùng với Executors.newWorkStealingPool() của Java 8 không?

Có, mọi chủ đề đều được hỗ trợ bằng deque riêng. Khi một luồng được thực hiện với nhiệm vụ của nó, nó sẽ lấy nhiệm vụ từ deque của luồng khác và thực hiện nó.

Và nếu có, giới hạn nếu có trên hàng đợi là gì?

Kích thước tối đa cho hàng đợi bị giới hạn bởi số: static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

Khi hàng đợi đầy một ngoại lệ được kiểm soát được ném: RejectedExecutionException("Queue capacity exceeded")

3

Từ grepcode của ExecutorsForkJoinPool

Executors. newWorkStealingPool lợi nhuận ForkJoinPool

Chấp hành viên:

public static ExecutorService newWorkStealingPool() { 
     return new ForkJoinPool 
      (Runtime.getRuntime().availableProcessors(), 
      ForkJoinPool.defaultForkJoinWorkerThreadFactory, 
      null, true); 
    } 

ForkJoinPool:

public ForkJoinPool(int parallelism, 
         ForkJoinWorkerThreadFactory factory, 
         UncaughtExceptionHandler handler, 
         boolean asyncMode) { 
     this(checkParallelism(parallelism), 
      checkFactory(factory), 
      handler, 
      asyncMode ? FIFO_QUEUE : LIFO_QUEUE, 
      "ForkJoinPool-" + nextPoolId() + "-worker-"); 
     checkPermission(); 
    } 

On execute():

public void execute(ForkJoinTask<?> task) { 
     if (task == null) 
      throw new NullPointerException(); 
     externalPush(task); 
    } 

externalPush gọi externalSubmit và bạn có thể xem chi tiết WorkQueue trong triển khai đó.

externalSubmit:

// hoạt động ngoài

/** 
* Full version of externalPush, handling uncommon cases, as well 
* as performing secondary initialization upon the first 
* submission of the first task to the pool. It also detects 
* first submission by an external thread and creates a new shared 
* queue if the one at index if empty or contended. 
* 
* @param task the task. Caller must ensure non-null. 

*/ 

Bạn có thể tìm thêm chi tiết về kích thước hàng đợi trong WorkQueue lớp

static final class WorkQueue { 

Tài liệu trên WokrQueue:

/** 
    * Queues supporting work-stealing as well as external task 
    * submission. See above for descriptions and algorithms. 
    * Performance on most platforms is very sensitive to placement of 
    * instances of both WorkQueues and their arrays -- we absolutely 
    * do not want multiple WorkQueue instances or multiple queue 
    * arrays sharing cache lines. The @Contended annotation alerts 
    * JVMs to try to keep instances apart. 
    */ 
    @sun.misc.Contended 

/** 
    * Capacity of work-stealing queue array upon initialization. 
    * Must be a power of two; at least 4, but should be larger to 
    * reduce or eliminate cacheline sharing among queues. 
    * Currently, it is much larger, as a partial workaround for 
    * the fact that JVMs often place arrays in locations that 
    * share GC bookkeeping (especially cardmarks) such that 
    * per-write accesses encounter serious memory contention. 
    */ 
    static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 

    /** 
    * Maximum size for queue arrays. Must be a power of two less 
    * than or equal to 1 << (31 - width of array entry) to ensure 
    * lack of wraparound of index calculations, but defined to a 
    * value a bit less than this to help users trap runaway 
    * programs before saturating systems. 
    */ 
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 
Các vấn đề liên quan