2009-10-28 30 views
23

Tôi đã gặp phải sự cố hai lần ngay bây giờ, theo đó một chuỗi nhà sản xuất tạo ra các mục công việc N, gửi chúng đến ExecutorService và sau đó cần đợi cho đến khi tất cả các mục N đã được xử lý.CountDownLatch linh hoạt?

Hãy cẩn thận

  • N là không được biết trước. Nếu tôi chỉ đơn giản là tạo ra một CountDownLatch và sau đó có chủ đề sản xuất await() cho đến khi tất cả công việc được hoàn thành.
  • Sử dụng CompletionService không phù hợp vì mặc dù chủ đề sản xuất của tôi cần chặn (tức là bằng cách gọi take()) có không có cách nào báo hiệu rằng tất cả công việc đã hoàn tất.

giải pháp ưa chuộng hiện tại của tôi là sử dụng một bộ đếm số nguyên, và để increment này bất cứ khi nào một mục công việc được gửi và sụt lần nó khi một hạng mục công trình được xử lý. Sau khi đăng ký tất cả các nhiệm vụ của N, luồng sản xuất của tôi sẽ cần phải chờ trên khóa, kiểm tra xem có counter == 0 bất cứ khi nào nó được thông báo hay không. Chủ đề của người tiêu dùng sẽ cần phải thông báo cho nhà sản xuất nếu nó đã giảm giá bộ đếm và giá trị mới là 0.

Có cách tiếp cận tốt hơn cho vấn đề này hoặc có cấu trúc phù hợp trong java.util.concurrent Tôi nên sử dụng hơn là "lăn của riêng tôi"?

Xin cảm ơn trước.

+0

Vào thời điểm nào nhà sản xuất biết có bao nhiêu mục công việc? Khi sản phẩm cuối cùng được sản xuất? –

+0

Giải pháp hiện tại của bạn có thể bị điều kiện chạy đua: sản xuất vật phẩm 1 -> counter ++ -> mặt hàng 1 -> counter-- -> sản xuất mặt hàng 2. Vì bộ đếm đã bị giảm trước khi nhà sản xuất sản xuất vật phẩm tiếp theo, nhà sản xuất nghĩ rằng anh ta đã sẵn sàng. –

+0

@rwwilden: Bạn đúng trong trường hợp này có thể xảy ra. Tuy nhiên, nhà sản xuất của tôi sẽ chỉ kiểm tra/chờ trên quầy sau khi gửi * tất cả * mục công việc và vì vậy nó không đại diện cho một điều kiện chủng tộc trong trường hợp cụ thể này. – Adamski

Trả lời

24

java.util.concurrent.Phaser có vẻ như nó sẽ hoạt động tốt cho bạn. Nó được lên kế hoạch phát hành trong Java 7 nhưng phiên bản ổn định nhất có thể được tìm thấy trên trang web của nhóm lợi ích của jsr166.

Phaser là Thanh chắn tuần hoàn được tôn vinh. Bạn có thể đăng ký số lượng của các bên và khi bạn sẵn sàng chờ đợi trước của họ ở giai đoạn cụ thể.

Một ví dụ nhanh về cách nó sẽ làm việc:

final Phaser phaser = new Phaser(); 

public Runnable getRunnable(){ 
    return new Runnable(){ 
     public void run(){ 
      ..do stuff... 
      phaser.arriveAndDeregister(); 
     } 
    }; 
} 
public void doWork(){ 
    phaser.register();//register self 
    for(int i=0 ; i < N; i++){ 
     phaser.register(); // register this task prior to execution 
     executor.submit(getRunnable()); 
    } 
    phaser.arriveAndAwaitAdvance(); 
} 
+3

Cool - Cảm ơn; Điều này trông giống như những gì tôi đang sau ... Không thể tin rằng họ gọi nó là Phaser mặc dù. – Adamski

+2

Ước gì tôi có thể upvote x100 lần nữa. Cảm ơn rất nhiều. –

+2

Nếu điểm của 'getRunnable()' là đại diện cho một nhiệm vụ chỉ báo hiệu phaser và sau đó kết thúc, bạn muốn gọi 'phaser.arriveAndDeregister()' và ** not ** 'phaser.arrive()' như cách khác khi nhiệm vụ cha mẹ gọi 'phaser.arriveAndAwaitAdvance()' lần thứ hai nó sẽ bế tắc, vì nhiệm vụ sẽ đợi các tác vụ đã hoàn thành vẫn được đăng ký trong phaser. –

2

Bạn có thể sử dụng tất nhiên một CountDownLatch bảo vệ bởi một AtomicReference để nhiệm vụ của bạn được bao bọc như sau:

public class MyTask extends Runnable { 
    private final Runnable r; 
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; } 

    public void run() { 
     r.run(); 
     while (l.get() == null) Thread.sleep(1000L); //handle Interrupted 
     l.get().countDown(); 
    } 
} 

Thông báo rằng nhiệm vụ điều hành công việc của họ và sau đó quay cho đến khi đếm xuống là thiết lập (tức là tổng số nhiệm vụ được biết). Ngay sau khi đếm ngược được thiết lập, họ đếm nó xuống và thoát. Những nhận nộp như sau:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>(); 
executor.submit(new MyTask(r, l)); 

Sau điểm tạo/trình công việc của bạn, khi bạn biết có bao nhiêu công việc bạn đã tạo:

latch.set(new CountDownLatch(nTasks)); 
latch.get().await(); 
+0

Tại sao gói CountDownLatch với AtomicReference lại giúp bạn trong trường hợp này? Tham chiếu không cần phải được bảo vệ, vì nó được chuyển tới MyTask trong hàm khởi tạo của nó và không bao giờ thay đổi sau đó. – Avi

+0

Ngoài ra, hãy chắc chắn để xử lý throwables tùy ý (RuntimeExceptions và lỗi) trong run() phương pháp, ít nhất bằng cách đặt coundDown() trong một khối {} cuối cùng. – Avi

+0

Và tất nhiên, câu hỏi cụ thể là về trường hợp khi bạn không biết số nhiệm vụ tại thời điểm tạo nhiệm vụ. – Avi

0

tôi giả sử sản xuất của bạn không không cần biết khi hàng đợi trống nhưng cần biết khi nào nhiệm vụ cuối cùng đã được hoàn thành.

Tôi sẽ thêm phương thức waitforWorkDone(producer) cho người tiêu dùng. Nhà sản xuất có thể thêm nhiệm vụ N của nó và gọi phương thức chờ. Phương thức chờ chặn luồng đến iff hàng đợi công việc không rỗng và không có nhiệm vụ nào đang thực hiện tại thời điểm này.

Chủ đề người tiêu dùng notifyAll() trên waitfor lock iff nhiệm vụ của nó đã được hoàn thành, hàng đợi trống và không có nhiệm vụ nào khác đang được thực thi.

1

Tôi đã sử dụng một ExecutorCompletionService cho một cái gì đó như thế này:

ExecutorCompletionService executor = ...; 
int count = 0; 
while (...) { 
    executor.submit(new Processor()); 
    count++; 
} 

//Now, pull the futures out of the queue: 
for (int i = 0; i < count; i++) { 
    executor.take().get(); 
} 

này liên quan đến việc giữ một danh sách các nhiệm vụ đã được gửi đi, vì vậy nếu danh sách của bạn là tùy tiện lâu, phương pháp của bạn có thể được tốt hơn.

Nhưng hãy đảm bảo sử dụng AtomicInteger để điều phối, để bạn có thể tăng nó trong một chuỗi và giảm nó trong chuỗi công việc.

+0

Làm cách nào để bạn có thể 'chờ đợi'? Tắt dịch vụ của bạn là gì? Ngoài ra, bạn không thể sử dụng lại dịch vụ hoàn thành trong câu trả lời của mình, bởi vì bạn có thể chờ đợi các tác vụ đã được một phần logic của * bộ khác *. Và (như đã đề cập trong phần bình luận cho câu trả lời của tôi), bạn vẫn cần phải biết 'nTasks' tại một số điểm –

+0

@oxbow_lakes: True. Đó là lý do tại sao tôi chỉ có nó như là một lựa chọn (mà tôi đã chỉnh sửa, vì nó không giúp ích trong trường hợp này). Tôi giả định rằng dịch vụ hoàn thành này sẽ không được sử dụng cho một tập hợp các tác vụ khác cùng một lúc (chồng chéo) với công việc này. Nó có thể được sử dụng lại sau, từ cùng một luồng. – Avi

+0

Ưu điểm của việc theo dõi số lượng nhiệm vụ thay vì sử dụng AtomicBoolean, là bạn không phải xử lý wait() và thông báo() theo cách thủ công - hàng đợi sẽ xử lý điều đó. Tất cả bạn phải theo dõi là số lượng các mục trong hàng đợi. – Avi

0
+0

Bạn có thể giải thích thêm không? Câu trả lời trên Stack Overflow phải chứa nội dung giải thích từ các nguồn được liên kết để được coi là đủ. – VermillionAzure

0

gì bạn được mô tả khá giống với sử dụng Semaphore tiêu chuẩn nhưng được sử dụng 'ngược lại'.

  • semaphore của bạn bắt đầu với 0 phép
  • Mỗi đơn vị công tác phát hành một giấy phép khi nó hoàn thành
  • Bạn chặn bằng cách chờ đợi để có được N cho phép

Thêm vào đó bạn có thể linh động chỉ được M < N giấy phép hữu ích nếu bạn muốn kiểm tra trạng thái trung gian. Ví dụ tôi đang thử nghiệm một hàng đợi thông điệp không đồng bộ nên tôi mong đợi hàng đợi đầy đủ cho một số M < N vì vậy tôi có thể có được M và kiểm tra xem hàng đợi có thực sự đầy đủ không, sau đó lấy các N-M còn lại xếp hàng.

Các vấn đề liên quan