2016-07-31 14 views
5

Tôi gặp sự cố khi triển khai giải pháp kiểu chức năng trang nhã cho chương trình cần phối hợp nhiệm vụ khác nhau. Đây là những gì tôi muốn đạt được.Triển khai phối hợp tác vụ

tôi có ba lớp học có phương pháp tôi muốn dàn xếp (giản thể cho ngắn gọn):

class TaskA { 
    public ResultA call() { 
     return new ResultA(); 
    } 
} 

class TaskB { 
    public ResultB call(ResultA a) { 
     return new ResultB(); 
    } 
} 

class TaskC { 
    public ResultC call(List<ResultB> resultBs) { 
     return new ResultC(); 
    } 
} 

tôi cần phải thực hiện TaskA 'n' lần song song và đối với từng thực hiện TaskA, tôi cần phải thực hiện TaskB 'n' lần sử dụng kết quả của TaskA tương ứng. Cuối cùng, tôi cần phải thực hiện TaskC sau khi sử dụng kết quả của tất cả các yêu cầu của TaskB.

Một cách để đạt được điều này sẽ tạo ra một Callable rằng gói gọn cuộc gọi đến TaskATaskB và cuối cùng trong chủ đề chính của tôi, thu thập các List của Future s của ResultB để thực hiện TaskC:

class TaskATaskBCallable implements Callable<ResultB> { 
    private TaskA taskA ...; 
    private TaskB taskB ...; 

    public ResultB call() { 
     return taskB.call(taskA.call()); 
    } 
} 

Và trong chủ đề chính của tôi:

private ResultC orchestrate() { 
    ExecutorService service = ...; 
    List<Callable<ResultB>> callables = ...; 

    taskC.call(callables.map(callable -> 
     service.submit(callable)).map(Future::get).collect(Collectors.toList()); 
} 

Một điều tôi không thích về giải pháp này là TaskATaskBCallable. Điều này có lẽ là một khớp nối không cần thiết lớp TaskATaskB. Hơn nữa, nếu tôi phải kết nối một công việc khác với TaskATaskB, tôi sẽ phải sửa đổi TaskATaskBCallable cũng có thể sửa đổi tên của nó. Tôi cảm thấy tôi có thể loại bỏ nó bằng cách sử dụng thông minh hơn các lớp thư viện đồng thời Java như CompletableFuture hoặc Phaser.

Mọi con trỏ?

Trả lời

0

Tôi tìm thấy một cách để làm điều này bằng CompletableFuture:

private ResultC orchestrate() { 
    ExecutorService service = ...; 
    int taskCount = ...; 

    List<CompletableFuture<ResultB>> resultBFutures = IntStream.rangeClosed(1, taskCount) 
        .mapToObj((i) -> CompletableFuture.supplyAsync(() -> new TaskA().call(), service)) 
        .map(resultAFuture -> resultAFuture.thenApplyAsync(resultA -> new TaskB().call(resultA), 
            service)) 
        .collect(Collectors.toList()); 

    return new TaskC().call(CompletableFuture.allOf(resultBFutures.toArray(new CompletableFuture[resultBFutures.size()])) 
        .thenApply(v -> resultBFutures.stream().map(CompletableFuture::join) 
            .collect(Collectors.toList())) 
        .join()); 
} 
-1

Tôi nghĩ rằng thực sự CompletableFuture sẽ chứng minh thanh lịch nhất:

int taskCount = 100; 

    List<ResultB> resultBs = IntStream.range(0, taskCount) 
      .mapToObj(i -> new TaskA()) 
      .map(taskA -> CompletableFuture.supplyAsync(taskA::call)) 
      .map(completableFutureA -> completableFutureA.thenApplyAsync(new TaskB()::call)) 
      .collect(Collectors.toList()) // collect, in order to kick off the async tasks 
      .stream() 
      .map(CompletableFuture::join) 
      .collect(Collectors.toList()); 
    return new TaskC().call(resultBs); 
+0

Không hẳn, bạn 'CompletableFuture :: join' làm cho toàn bộ điều nối tiếp. Giải pháp của bạn thực thi đồ thị sau trong chuỗi: 'TaskA-> TaskB-> TaskA-> TaskB ....-> TaskC' –

+0

@SwarangaSarma bạn nói đúng, tôi cần thu thập trước khi tham gia hoặc đánh giá lười biếng của Stream vít me :) – bowmore

Các vấn đề liên quan