A deadlock chết đói chủ đề xảy ra trong một nhóm chủ đề bình thường nếu tất cả các chủ đề trong hồ bơi đang chờ đợi các nhiệm vụ xếp hàng trong cùng một nhóm để hoàn thành. ForkJoinPool
tránh vấn đề này bằng cách ăn cắp công việc từ các chủ đề khác từ bên trong cuộc gọi join()
, thay vì chỉ đơn giản là chờ đợi. Ví dụ:Tôi có thể sử dụng hành vi trộm cắp công việc của ForkJoinPool để tránh bị bế tắc không?
private static class ForkableTask extends RecursiveTask<Integer> {
private final CyclicBarrier barrier;
ForkableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
protected Integer compute() {
try {
barrier.await();
return 1;
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
@Test
public void testForkJoinPool() throws Exception {
final int parallelism = 4;
final ForkJoinPool pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
for (int i = 0; i < parallelism; ++i) {
forkableTasks.add(new ForkableTask(barrier));
}
int result = pool.invoke(new RecursiveTask<Integer>() {
@Override
protected Integer compute() {
for (ForkableTask task : forkableTasks) {
task.fork();
}
int result = 0;
for (ForkableTask task : forkableTasks) {
result += task.join();
}
return result;
}
});
assertThat(result, equalTo(parallelism));
}
Nhưng khi sử dụng giao diện ExecutorService
đến một ForkJoinPool
thống, work-ăn cắp dường như không xảy ra. Ví dụ:
private static class CallableTask implements Callable<Integer> {
private final CyclicBarrier barrier;
CallableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public Integer call() throws Exception {
barrier.await();
return 1;
}
}
@Test
public void testWorkStealing() throws Exception {
final int parallelism = 4;
final ExecutorService pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));
int result = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
// Deadlock in invokeAll(), rather than stealing work
for (Future<Integer> future : pool.invokeAll(callableTasks)) {
result += future.get();
}
return result;
}
}).get();
assertThat(result, equalTo(parallelism));
}
Từ một cái nhìn lướt qua ForkJoinPool
's thực hiện, tất cả các ExecutorService
API thường được thực hiện bằng ForkJoinTask
s, vì vậy tôi không chắc chắn lý do tại sao một bế tắc xảy ra.
Tôi không nghĩ rằng việc đánh cắp công việc tránh bế tắc. Khi bạn bế tắc, bạn không thể tiến bộ được. Công việc ăn cắp chỉ tránh hàng đợi không cân bằng bằng cách cho phép chủ đề ăn cắp từ hàng đợi khác nếu hàng đợi của họ trống. – markspace
@markspace Khi triển khai 'ForkJoinTask',' join() 'cố gắng chạy các công việc khác từ deque hơn là trì hoãn, tránh được bế tắc. Vì 'ForkJoinPool.invokeAll()' chuyển đổi '' Callable 'thành' ForkJoinTask', tôi mong rằng nó sẽ hoạt động. –