11

A deadlock chết đói chủ đề xảy ra trong một nhóm chủ đề bình thường nếu tất cả các chủ đề trong hồ bơi đang chờ đợi các nhiệm vụ xếp hàng trong cùng một nhóm để hoàn thành. ForkJoinPool tránh vấn đề này bằng cách ăn cắp công việc từ các chủ đề khác từ bên trong cuộc gọi join(), thay vì chỉ đơn giản là chờ đợi. Ví dụ:Tôi có thể sử dụng hành vi trộm cắp công việc của ForkJoinPool để tránh bị bế tắc không?

private static class ForkableTask extends RecursiveTask<Integer> { 
    private final CyclicBarrier barrier; 

    ForkableTask(CyclicBarrier barrier) { 
     this.barrier = barrier; 
    } 

    @Override 
    protected Integer compute() { 
     try { 
      barrier.await(); 
      return 1; 
     } catch (InterruptedException | BrokenBarrierException e) { 
      throw new RuntimeException(e); 
     } 
    } 
} 

@Test 
public void testForkJoinPool() throws Exception { 
    final int parallelism = 4; 
    final ForkJoinPool pool = new ForkJoinPool(parallelism); 
    final CyclicBarrier barrier = new CyclicBarrier(parallelism); 

    final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism); 
    for (int i = 0; i < parallelism; ++i) { 
     forkableTasks.add(new ForkableTask(barrier)); 
    } 

    int result = pool.invoke(new RecursiveTask<Integer>() { 
     @Override 
     protected Integer compute() { 
      for (ForkableTask task : forkableTasks) { 
       task.fork(); 
      } 

      int result = 0; 
      for (ForkableTask task : forkableTasks) { 
       result += task.join(); 
      } 
      return result; 
     } 
    }); 
    assertThat(result, equalTo(parallelism)); 
} 

Nhưng khi sử dụng giao diện ExecutorService đến một ForkJoinPool thống, work-ăn cắp dường như không xảy ra. Ví dụ:

private static class CallableTask implements Callable<Integer> { 
    private final CyclicBarrier barrier; 

    CallableTask(CyclicBarrier barrier) { 
     this.barrier = barrier; 
    } 

    @Override 
    public Integer call() throws Exception { 
     barrier.await(); 
     return 1; 
    } 
} 

@Test 
public void testWorkStealing() throws Exception { 
    final int parallelism = 4; 
    final ExecutorService pool = new ForkJoinPool(parallelism); 
    final CyclicBarrier barrier = new CyclicBarrier(parallelism); 

    final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier)); 
    int result = pool.submit(new Callable<Integer>() { 
     @Override 
     public Integer call() throws Exception { 
      int result = 0; 
      // Deadlock in invokeAll(), rather than stealing work 
      for (Future<Integer> future : pool.invokeAll(callableTasks)) { 
       result += future.get(); 
      } 
      return result; 
     } 
    }).get(); 
    assertThat(result, equalTo(parallelism)); 
} 

Từ một cái nhìn lướt qua ForkJoinPool 's thực hiện, tất cả các ExecutorService API thường được thực hiện bằng ForkJoinTask s, vì vậy tôi không chắc chắn lý do tại sao một bế tắc xảy ra.

+2

Tôi không nghĩ rằng việc đánh cắp công việc tránh bế tắc. Khi bạn bế tắc, bạn không thể tiến bộ được. Công việc ăn cắp chỉ tránh hàng đợi không cân bằng bằng cách cho phép chủ đề ăn cắp từ hàng đợi khác nếu hàng đợi của họ trống. – markspace

+0

@markspace Khi triển khai 'ForkJoinTask',' join() 'cố gắng chạy các công việc khác từ deque hơn là trì hoãn, tránh được bế tắc. Vì 'ForkJoinPool.invokeAll()' chuyển đổi '' Callable 'thành' ForkJoinTask', tôi mong rằng nó sẽ hoạt động. –

Trả lời

20

Bạn gần như đã trả lời câu hỏi của riêng bạn. Giải pháp là tuyên bố rằng "ForkJoinPool tránh được vấn đề này bằng cách ăn cắp công việc từ các chủ đề khác từ bên trong cuộc gọi join()". Bất cứ khi nào các chủ đề bị chặn vì một số lý do khác ngoại trừ ForkJoinPool.join(), việc đánh cắp công việc này không xảy ra, và các luồng chỉ chờ đợi và không làm gì cả.

Lý do cho điều này là trong Java nó không thể cho các ForkJoinPool để ngăn chặn chủ đề của nó từ ngăn chặn và thay vào đó cung cấp cho họ cái gì khác để làm việc trên. Các chủ đề chính nó cần phải tránh chặn và thay vào đó yêu cầu các hồ bơi cho công việc cần làm. Và điều này chỉ được triển khai theo phương thức ForkJoinTask.join(), không phải trong bất kỳ phương pháp chặn nào khác. Nếu bạn sử dụng Future bên trong một ForkJoinPool, bạn cũng sẽ thấy bế tắc chết đói.

Tại sao việc đánh cắp công việc chỉ được thực hiện trong ForkJoinTask.join() và không có trong bất kỳ phương pháp chặn nào khác trong API Java? Vâng, có rất nhiều phương pháp chặn như vậy (Object.wait(), Future.get(), bất kỳ nguyên tắc đồng thời nào trong số java.util.concurrent, phương thức I/O, v.v.) và chúng không liên quan gì đến ForkJoinPool, đây chỉ là một lớp tùy ý trong API, vì vậy việc thêm trường hợp đặc biệt cho tất cả các phương pháp này sẽ là thiết kế xấu. Nó cũng sẽ dẫn đến các hiệu ứng rất đáng ngạc nhiên và không mong muốn.Hãy tưởng tượng ví dụ một người dùng chuyển một nhiệm vụ đến một số ExecutorService chờ một số Future và sau đó phát hiện ra rằng tác vụ bị treo rất dài trong Future.get() chỉ vì luồng đang chạy đã lấy trộm một số mục công việc khác (dài hạn) thay vì đợi Future và tiếp tục ngay lập tức sau khi có kết quả. Khi một thread bắt đầu làm việc trên một tác vụ khác, nó không thể trở lại nhiệm vụ ban đầu cho đến khi nhiệm vụ thứ hai kết thúc. Vì vậy, nó thực sự là một điều tốt mà các phương pháp chặn khác không làm việc ăn cắp. Đối với một ForkJoinTask, vấn đề này không tồn tại, bởi vì nó không quan trọng là nhiệm vụ chính được tiếp tục càng sớm càng tốt, nó chỉ quan trọng là tất cả các nhiệm vụ cùng nhau được xử lý hiệu quả nhất có thể.

Cũng không thể thực hiện phương pháp của riêng bạn để thực hiện việc đánh cắp công việc bên trong một ForkJoinPool, bởi vì tất cả các phần liên quan không được công khai.

Tuy nhiên, có thực sự là một phương pháp thứ hai như thế nào deadlocks đói có thể được ngăn chặn. Điều này được gọi là chặn được quản lý. Nó không sử dụng trộm cắp công việc (để tránh các vấn đề được đề cập ở trên), nhưng cũng cần các thread đó là có được khối để tích cực hợp tác với các hồ bơi thread. Với tính năng chặn được quản lý, luồng sẽ cho biết hồ bơi chủ đề có thể bị chặn trước khi nó gọi phương thức chặn có khả năng và cũng thông báo cho hồ bơi khi phương pháp chặn hoàn tất. Các hồ bơi thread sau đó biết rằng có một nguy cơ của một bế tắc chết đói, và có thể đẻ trứng thêm chủ đề nếu tất cả các chủ đề của nó hiện đang trong một số hoạt động chặn và vẫn còn các nhiệm vụ khác để thực thi. Lưu ý rằng điều này là ít hiệu quả hơn so với việc đánh cắp công việc, bởi vì chi phí của các chủ đề bổ sung. Nếu bạn thực hiện thuật toán song song đệ quy với tương lai thông thường và chặn được quản lý thay vì bằng ForkJoinTask và đánh cắp công việc, số lượng chuỗi bổ sung có thể rất lớn (vì trong giai đoạn "phân chia" của thuật toán, nhiều tác vụ sẽ được tạo và được đưa ra cho các chuỗi chặn ngay lập tức và chờ kết quả từ các tác vụ phụ). Tuy nhiên, một deadlock chết đói vẫn được ngăn chặn, và nó tránh được vấn đề rằng một nhiệm vụ phải chờ đợi một thời gian dài bởi vì thread của nó bắt đầu làm việc trên một nhiệm vụ khác trong thời gian đó.

Các ForkJoinPool của Java cũng hỗ trợ chặn được quản lý. Để sử dụng, cần thực hiện giao diện ForkJoinPool.ManagedBlocker sao cho phương thức chặn có khả năng mà tác vụ muốn thực hiện được gọi từ phương thức block của giao diện này. Sau đó, nhiệm vụ có thể không gọi trực tiếp phương thức chặn, nhưng thay vào đó, cần gọi phương thức tĩnh ForkJoinPool.managedBlock(ManagedBlocker). Phương pháp này xử lý giao tiếp với nhóm luồng trước và sau khi chặn. Nó cũng hoạt động nếu nhiệm vụ hiện tại không được thực hiện trong một ForkJoinPool, sau đó nó chỉ gọi phương thức chặn.

Nơi duy nhất tôi tìm thấy trong API Java (dành cho Java 7) thực sự sử dụng chặn được quản lý là lớp Phaser. Vì vậy, đồng bộ hóa với một Phaser bên trong một nhiệm vụ ForkJoinPool nên sử dụng quản lý chặn và có thể tránh deadlocks chết đói (nhưng ForkJoinTask.join() vẫn còn thích hợp hơn vì nó sử dụng công việc ăn cắp thay vì chặn được quản lý). Tính năng này hoạt động bất kể bạn sử dụng trực tiếp ForkJoinPool hay thông qua giao diện ExecutorService của nó. Tuy nhiên, nó sẽ không hoạt động nếu bạn sử dụng bất kỳ số nào khác ExecutorService giống như được tạo bởi lớp Executors, vì chúng không hỗ trợ chặn được quản lý.

Trong Scala, việc sử dụng chặn được quản lý rộng rãi hơn (description, API).

+2

Cảm ơn câu trả lời này, rất toàn diện. Mặc dù vậy, một nitpick, việc thực hiện 'ForkJoinTask' thực hiện việc đánh cắp tương tự trong' get() 'như trong' join() '. Sự bế tắc trong câu hỏi của tôi chủ yếu xuất phát từ việc cố gắng đồng bộ hóa mà không có 'ForkJoinPool.managedBlock()' (trên thực tế, cả hai ví dụ bế tắc trên Java 7). Sử dụng 'Phaser' thay vào đó, cả hai có thể được thực hiện để làm việc. –

+0

Có lẽ bạn có thể làm rõ làm thế nào [trạng thái chặn luồng của Java] (http://docs.oracle.com/javase/7/docs/api/java/lang/Thread.State.html) tương ứng với một chuỗi chặn trong hiện đại Scala cảm giác. Mọi hoạt động chặn có chờ trên khóa màn hình Java như được mô tả trên liên kết đó không? hoặc hồ bơi chủ đề có theo dõi trạng thái chặn theo cách khác không? – matanster

+0

@matt Tôi không chắc chắn ý bạn là gì bằng cách "chặn luồng trong ý nghĩa Scala hiện đại". Kết nối giữa 'State.BLOCKED' và chặn được quản lý chỉ là một luồng biết rằng nó có thể ở trạng thái BLOCKED sớm nên báo cho ForkJoinPool biết về trả trước này bằng cách gọi' managedBlock'. ForkJoinPool không cố gắng phát hiện xem các luồng hiện có bị chặn hay không. Và nếu một thread sử dụng 'managedBlock' nhưng không thực sự bị BLOCKED thì ForkJoinPool vẫn sẽ tăng số lượng các luồng. –

0

Tôi biết bạn đang làm gì nhưng tôi không biết tại sao. Ý tưởng về một rào chắn là các chủ đề độc lập nên có thể chờ đợi nhau để đạt được một điểm chung. Bạn không có chủ đề độc lập. hồ Chủ đề, F/J, là dành cho Data Parallelism

Bạn đang làm một cái gì đó hài hòa với Task Parallelism

Lý do F/J tiếp tục là khuôn khổ tạo ra "chủ đề tiếp tục" để tiếp tục lấy công việc từ deques khi tất cả các chuỗi công việc đang chờ.

+0

Các rào cản chỉ ở đó để đảm bảo mỗi tác vụ được lên lịch trên một chuỗi riêng biệt. Và tôi không nghĩ rằng "các chuỗi tiếp tục" là câu trả lời, nếu bạn in ra 'Thread.currentThread(). GetId()', bạn sẽ thấy rằng một trong các 'ForkableTasks' chạy trong cùng một luồng với chuỗi gọi phần còn lại, và chỉ có 4 chủ đề được sử dụng trong tổng số. –

+0

Bạn không thể đảm bảo chuỗi nào xử lý nhiệm vụ nào với việc đánh cắp công việc. Tất cả các tác vụ đổ vào cùng một hàng đợi gửi. Tùy thuộc vào việc bạn sử dụng bản phát hành nào (Java7/8) mà các khối công việc được thay thế bằng các chủ đề "tiếp tục" hoặc "bồi thường". Những gì bạn đang làm không phải là sở trường của F/J (Dữ liệu song song.) – edharned

0

Khi sử dụng một nhóm chủ đề có số lượng chuỗi và tác vụ hạn chế có thể chặn (ví dụ: theo tương lai.get()), có luôn có khả năng bị đói trong chuỗi. Sử dụng hồ bơi luồng không giới hạn (và sẵn sàng sử dụng OutOfMemoryError) hoặc sử dụng các tác vụ không chặn bằng cách tách các tác vụ chặn để bỏ chặn các phần được kích hoạt khi điều kiện bắt buộc được đáp ứng. Class Future không thể làm điều này (kích hoạt một nhiệm vụ), nhưng CompletableFuture từ Java8 có thể. Ngoài ra, hãy xem nhiều thư viện diễn viên và dataflow, ví dụ: của tôi df4j2

+0

Điều này chắc chắn đúng với các hồ bơi luồng truyền thống. Nhưng 'ForkJoinPool' thực hiện các API' ExecutorService' ở trên 'ForkJoinTask', vì vậy tôi mong đợi hành vi sẽ khớp. –

Các vấn đề liên quan