2012-04-10 25 views
8

Có điều gì đó kỳ lạ về việc triển khai BoundedExecutor trong cuốn sách Java Concurrency in Practice.Java Concurrency trong Thực hành: điều kiện chủng tộc trong BoundedExecutor?

Đó là nghĩa vụ phải điều tiết công việc gửi tới Người thực thi bằng cách chặn luồng gửi khi có đủ chuỗi được xếp hàng hoặc chạy trong Trình thực thi.

này là việc thực hiện (sau khi thêm rethrow thiếu trong mệnh đề catch):

public class BoundedExecutor { 
    private final Executor exec; 
    private final Semaphore semaphore; 

    public BoundedExecutor(Executor exec, int bound) { 
     this.exec = exec; 
     this.semaphore = new Semaphore(bound); 
    } 

    public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException { 
     semaphore.acquire(); 

     try { 
      exec.execute(new Runnable() { 
       @Override public void run() { 
        try { 
         command.run(); 
        } finally { 
         semaphore.release(); 
        } 
       } 
      }); 
     } catch (RejectedExecutionException e) { 
      semaphore.release(); 
      throw e; 
     } 
    } 

Khi tôi nhanh chóng BoundedExecutor với một Executors.newCachedThreadPool() và ràng buộc của 4, tôi mong chờ số chủ đề khởi tạo bởi các hồ bơi lưu trữ thread không bao giờ vượt quá 4. Trong thực tế, tuy nhiên, nó không. Tôi đã nhận được chương trình thử nghiệm nhỏ này để tạo ra tối đa 11 chủ đề:

public static void main(String[] args) throws Exception { 
    class CountingThreadFactory implements ThreadFactory { 
     int count; 

     @Override public Thread newThread(Runnable r) { 
      ++count; 
      return new Thread(r); 
     }   
    } 

    List<Integer> counts = new ArrayList<Integer>(); 

    for (int n = 0; n < 100; ++n) { 
     CountingThreadFactory countingThreadFactory = new CountingThreadFactory(); 
     ExecutorService exec = Executors.newCachedThreadPool(countingThreadFactory); 

     try { 
      BoundedExecutor be = new BoundedExecutor(exec, 4); 

      for (int i = 0; i < 20000; ++i) { 
       be.submitTask(new Runnable() { 
        @Override public void run() {} 
       }); 
      } 
     } finally { 
      exec.shutdown(); 
     } 

     counts.add(countingThreadFactory.count); 
    } 

    System.out.println(Collections.max(counts)); 
} 

Tôi nghĩ rằng có một khung thời gian nhỏ giữa việc phát hành semaphore và kết thúc nhiệm vụ, trong đó một sợi khác có thể chứa giấy phép và gửi tác vụ trong khi chuỗi phát hành chưa hoàn thành. Nói cách khác, nó có một điều kiện chủng tộc.

Ai đó có thể xác nhận điều này không?

+1

Tôi đã thêm 1ms Thread.sleep ngay sau semaphore.release() để xem mức độ tồi tệ hơn bao nhiêu: Tôi nhận được hơn 300 chuỗi được tạo. – toto2

Trả lời

2

Tôi thấy tối đa 9 chuỗi được tạo cùng một lúc. Tôi nghi ngờ có một điều kiện chủng tộc gây ra có nhiều chủ đề hơn yêu cầu.

Điều này có thể là do có trước và sau khi chạy công việc cần thực hiện. Điều này có nghĩa rằng mặc dù chỉ có 4 luồng bên trong khối mã của bạn, có một số luồng dừng một nhiệm vụ trước đó hoặc chuẩn bị sẵn sàng để bắt đầu một tác vụ mới.

tức là chuỗi phát hành() trong khi nó vẫn đang chạy. Mặc dù điều cuối cùng bạn làm không phải là điều cuối cùng trước khi có được một nhiệm vụ mới.

+0

Tôi đồng ý với tuyên bố của bạn, đó cũng chính là điều mà Bossie bị nghi ngờ ngay từ đầu. Nhưng tôi không chắc chắn tôi sẽ gọi đây là một điều kiện chủng tộc vì nó ngụ ý một số lỗi lập trình: Tôi không nghĩ rằng chương trình được cho là có số lượng chủ đề tối đa khi nó được viết. – toto2

+0

@ toto2 Nó không phải là một lỗi lập trình thường xuyên, nhưng có một điều kiện chủng tộc giữa Semaphore được phát hành và các chủ đề có được một nhiệm vụ mới. Nếu nhiệm vụ ở đó lâu hơn bạn chỉ có thể thấy hành vi này hiếm khi. –

+1

Tôi hiểu về chiều dài nhiệm vụ (xem nhận xét của tôi về câu hỏi của Bossie). Tôi chỉ có nghĩa là nó không phải là một lỗi vì không có gì xác định rằng cần có 4 chủ đề. "Nó không phải là một lỗi, đó là một tính năng!" Trong trường hợp này tôi nghĩ ai đó thực sự có thể đưa ra yêu sách đó. Xin lỗi ... Tôi chỉ nhận được triết học ở đây. – toto2

5

Bạn đang phân tích chính xác về điều kiện chủng tộc. Không có sự đảm bảo đồng bộ hóa giữa ExecutorService & Semaphore.

Tuy nhiên, tôi không biết nếu điều chỉnh số lượng chủ đề là những gì BoundedExecutor được sử dụng cho. Tôi nghĩ rằng đó là nhiều hơn cho throttling số lượng các nhiệm vụ được gửi đến dịch vụ. Hãy tưởng tượng nếu bạn có 5 triệu tác vụ cần gửi và nếu bạn gửi nhiều hơn thì 10.000 trong số đó sẽ hết bộ nhớ.

Bạn sẽ chỉ có 4 luồng chạy tại bất kỳ thời điểm nào, tại sao bạn muốn thử và xếp hàng lên tất cả 5 triệu tác vụ? Bạn có thể sử dụng một cấu trúc tương tự như thế này để điều tiết số nhiệm vụ xếp hàng tại bất kỳ thời điểm nào. Những gì bạn nên nhận ra điều này là tại bất kỳ thời điểm nào chỉ có 4 nhiệm vụ đang chạy.

Rõ ràng độ phân giải cho điều này là sử dụng Executors.newFixedThreadPool(4).

+1

Chính xác. Tác vụ được gửi đến trình thực thi bên dưới chạy lệnh 'phát hành semaphore trước khi nó hoàn thành, nhưng điều này xảy ra trong chuỗi chạy thi hành nhiệm vụ. Điều này cho phép một nhiệm vụ khác được gửi đến người thi hành và được gán cho một chủ đề mới, về mặt lý thuyết cho phép gấp đôi số chủ đề là 'ràng buộc' với điều kiện đúng. –

+1

@ John: nó làm giảm số lượng tác vụ được gửi, nhưng theo cách không thể đoán trước và không đáng tin cậy. Nếu các luồng được lên lịch theo cách không may, điều này có thể có nghĩa là nhiều luồng đệ trình có thể lẻn vào. Ngoài ra, trong trường hợp của 'newFixedThreadPool()', các tác vụ này vẫn có thể xếp chồng lên hàng đợi không bị chặn của 'Executor' , vẫn mạo hiểm hết bộ nhớ. @ David: Tôi đã nhận được nhiều như 11 chủ đề với một ràng buộc của 4. Ngoài ra, tôi nghĩ rằng đó là buồn cười mà các cuốn sách tham khảo về Java đồng thời vẫn có điều kiện chủng tộc trong đó. –

+0

@Bossie Bạn nói đúng, bạn vẫn còn nghi ngờ với OOM với newFixedThreadPool nhưng điều đó sẽ ngăn chặn các chuỗi dư thừa được tạo ra. Bạn sử dụng một ràng buộc để ngăn chặn, như tôi đã đề cập, hàng triệu tác vụ được gửi cùng một lúc. –

10

BoundedExecutor thực sự được dự định như là một minh hoạ về cách điều tiết tác vụ gửi bài chứ không phải là cách để đặt giới hạn về kích thước nhóm luồng. Có nhiều cách trực tiếp hơn để đạt được thứ hai, như ít nhất một bình luận đã chỉ ra.

Nhưng câu trả lời khác không đề cập đến nội dung trong sách nói rằng sử dụng một hàng đợi vô biên và

set the bound on the semaphore to be equal to the pool size plus the number of queued tasks you want to allow, since the semaphore is bounding the number of tasks both currently executing and awaiting execution. [JCiP, end of section 8.3.3]

Bằng nhắc đến hàng đợi vô biên và kích thước hồ bơi, chúng tôi đã ngụ ý (rõ ràng không phải là rất rõ ràng) việc sử dụng hồ bơi có kích thước bị chặn.

Điều gì đã luôn làm phiền tôi về BoundedExecutor, tuy nhiên, là nó không thực hiện giao diện ExecutorService. Một cách hiện đại để đạt được chức năng tương tự và vẫn thực hiện các giao diện chuẩn sẽ là sử dụng phương pháp listeningDecorator của Guava và lớp ForwardingListeningExecutorService.

+0

Câu trả lời từ một trong các chuyên gia, điều đó thật tuyệt vời. Cảm ơn vì đã làm mọi thứ rõ ràng hơn một chút Tim. Những 'ListenableFuture' s có vẻ thú vị. Vì vậy, tôi phải gửi một vài nhiệm vụ, và bất cứ khi nào một trong những hoàn thành, tôi gửi một cái mới trong cuộc gọi lại, phải không? –

+0

+1 từ nguồn –

+0

@Bossie - Bạn có thể làm điều đó, chắc chắn, nhưng tôi có nghĩa là bạn có thể tiếp tục sử dụng cách tiếp cận Semaphore của BoundedExecutorService bằng cách trang trí thực thi với một sự thu nhận và gọi lại với một bản phát hành. –

Các vấn đề liên quan