2017-04-06 12 views
6

Giả sử rằng tôi có một thiết lập ForkJoinPool với mức độ song song n, và tôi gọi một tính toán song song như thế này:ForkJoinPool và Future.Get

workpool.submit(
      () -> { 
        objects.values().parallelStream().forEach(obj -> { 
         obj.foo(); 
        }); 
       }); 

tôi làm điều này để đảm bảo rằng các chủ đề sinh ra có được tạo ra bên trong workpool (Tôi có các thành phần khác nhau của hệ thống cần được cách ly). Bây giờ giả định rằng chuỗi trong đó cái này được gọi, cũng đang thực hiện bên trong workpool này và tôi thực hiện:

Future<?> wait = workpool.submit(
      () -> { 
        objects.values().parallelStream().forEach(obj -> { 
         obj.foo(); 
        }); 
       }); 
wait.get() 

1) Tôi có chặn một luồng trong ForkJoinPool không? Nếu tôi có n chủ đề tất cả các khối trên tương lai, trong khi cố gắng để sắp xếp một nhiệm vụ trong workpool, điều này sẽ dẫn đến bế tắc? Nó không rõ ràng với tôi cho dù "mức độ tối đa của Parallellism" trong ForkJoinPool có nghĩa là (nếu có n không bị chặn nhiệm vụ), sẽ luôn có n chủ đề thực hiện, hoặc cho dù có một số cố định của chủ đề, bất kể cho dù có bị chặn. Điều gì sẽ xảy ra nếu tôi sử dụng wait.join() thay vì wait.join (tôi không cần kiểm tra ngoại lệ vì bất kỳ ngoại lệ nào được ném trong mã này sẽ tạo ra một runtimeexception. Nếu tôi hiểu chính xác, join() sẽ cho phép các luồng thực thi các tác vụ xếp hàng trong khi đợi)

2) tôi vẫn nhận được lợi ích của người nhẹ forkjoin nhiệm vụ của dòng song song nếu tôi đang tạo ra một "wrapper" lớp Runnable bằng cách thực hiện() -> {}

3) có Ngược lại/lộn ngược để sử dụng thay vào đó (giả sử rằng .join() thực sự thực hiện hành vi trộm cắp công việc mà tôi cho rằng):

 CompletableFuture.supplyAsync(this::mylambdafunction, workpool) 
      .thenAccept(this::mynextfunction); 

Trả lời

1

Trả lời điểm 1: Thật khó để biết liệu mã của bạn có bị chặn mà không thấy triển khai phương pháp thực tế hay không. Một cách tiếp cận để đối phó với mã chặn là tăng số lượng các luồng trong threadpool forkjoin. Thông thường, số lượng các chủ đề trong một luồng forkjoin là n + 1 cho các nhiệm vụ tính toán chuyên sâu, trong đó n = số bộ vi xử lý. Hoặc nếu bạn có I/O chặn, bạn có thể sử dụng ManagedBlocker.

đáp ứng chỉ 2:

đáp ứng cho điểm 3: Ưu điểm rõ ràng để mã completableFuture của bạn là thenAccept là không chặn. Vì vậy, kiểm soát sẽ ngay lập tức đi qua khối CompletableFuture của bạn để tuyên bố tiếp theo mà không cần chờ đợi trong khi trong mã trước đó bạn đã viết với một hồ bơi ForkJoin wait.get() sẽ chặn cho đến khi bạn có được một câu trả lời và sẽ không tiến hành cho đến lúc đó.

Các vấn đề liên quan