2013-07-31 53 views
21

Tôi đã chơi với CompletionStage/CompletableFuture trong Java 8 để xử lý không đồng bộ, hoạt động khá tốt. Tuy nhiên, đôi khi tôi muốn một giai đoạn để thực hiện xử lý không đồng bộ của một iterator/stream của các mục, và có vẻ như không phải là một cách để làm điều này.Có thể sử dụng API Java 8 Streams để xử lý không đồng bộ không?

Cụ thể, Stream.forEach() có ngữ nghĩa sau khi cuộc gọi tất cả các mục đã được xử lý. Tôi muốn điều tương tự, nhưng với một CompletionStage thay vào đó, ví dụ .:

CompletionStage<Void> done = stream.forEach(...); 
done.thenRun(...); 

Nếu Stream được hỗ trợ bởi một kết quả trực tuyến không đồng bộ này sẽ tốt hơn nhiều so với chờ đợi cho nó để được hoàn thành trong đoạn code trên chính nó.

Có thể xây dựng điều này với API Java 8 hiện tại không? Cách giải quyết?

+0

Xin chào! Tôi đã cập nhật câu trả lời của mình. Nếu nó sẽ làm hài lòng bạn, hãy bỏ phiếu lên Tôi đang cố kiếm thêm danh tiếng;) –

+2

@Rickard Có thể là quá muộn :) nhưng tôi đã tìm thấy thứ gì đó bạn muốn nếu tôi hiểu bạn một cách chính xác. http://www.reactive-streams.org/. https://github.com/reactor/reactor –

Trả lời

20

Theo như tôi biết, API luồng không hỗ trợ xử lý sự kiện không đồng bộ. Có vẻ như bạn muốn một cái gì đó giống như Tiện ích mở rộng phản ứng cho .NET và có một cổng Java có tên là RxJava, được tạo bởi Netflix.

RxJava hỗ trợ nhiều thao tác cấp cao giống như luồng Java 8 (chẳng hạn như bản đồ và bộ lọc) và không đồng bộ.

Cập nhật: Hiện tại có một sáng kiến ​​reactive streams trong công việc và có vẻ như JDK 9 sẽ bao gồm hỗ trợ cho ít nhất một phần của nó mặc dù lớp Flow.

+19

Thật vậy, RxJava là những gì bạn muốn. Trung tâm thiết kế cho luồng chủ yếu là về dữ liệu có thể được truy cập mà không có độ trễ (hoặc từ cấu trúc dữ liệu hoặc chức năng tạo); trung tâm thiết kế cho Rx là về các luồng sự kiện vô hạn có thể đến không đồng bộ. –

7

Khi @KarolKrol ám chỉ bạn có thể làm điều đó với luồng CompletableFuture.

Có thư viện xây dựng trên đầu dòng JDK8 để tạo điều kiện làm việc với các luồng CompletableFuture được gọi là cyclops-react.

Để soạn luồng, bạn có thể sử dụng API ike lời hứa thông thạo của cyclops hoặc bạn có thể sử dụng số Stages phản ứng đơn giản.

+0

Tôi quên bỏ phiếu những ngày này :) –

+0

@ KarolKról thay vì vận động hành lang cho upvotes, chỉ cần viết những câu trả lời tốt nhất có thể cho nhiều câu hỏi hơn. –

2

cyclops-react (Tôi là tác giả của thư viện này), cung cấp lớp học StreamUtils để xử lý Luồng. Một trong những hàm mà nó cung cấp là futureOperations, cung cấp quyền truy cập vào các hoạt động đầu cuối Stream chuẩn (và sau đó một số) với một twist - Luồng được thực hiện không đồng bộ và kết quả được trả về bên trong một CompletableFuture. .eg

Stream<Integer> stream = Stream.of(1,2,3,4,5,6) 
             .map(i->i+2); 
CompletableFuture<List<Integer>> asyncResult = StreamUtils.futureOperations(stream, 
              Executors.newFixedThreadPool(1)) 
             .collect(Collectors.toList()); 

Ngoài ra còn có một convience lớp ReactiveSeq rằng kết thúc tốt đẹp Stream và cung cấp các chức năng tương tự, với một API thạo đẹp

CompletableFuture<List<Integer>> asyncResult = ReactiveSeq.of(1,2,3,4,5,6) 
             .map(i->i+2) 
             .futureOperations(
              Executors.newFixedThreadPool(1)) 
             .collect(Collectors.toList()); 

Như Adam đã chỉ ra cyclops-react FutureStreams được thiết kế để xử lý dữ liệu không đồng bộ (bằng cách trộn tương lai và Streams với nhau) - nó đặc biệt thích hợp cho các hoạt động đa luồng liên quan đến việc chặn I/O (chẳng hạn như đọc các tập tin, thực hiện cuộc gọi db, thực hiện cuộc gọi còn lại vv).

1

Có thể tạo luồng, ánh xạ từng phần tử đến CompletionStage và thu thập kết quả bằng cách sử dụng CompletionStage.thenCombine(), nhưng mã kết quả sẽ không đọc được nhiều hơn, sau đó sử dụng đơn giản như thế này.

CompletionStage<Collection<Result>> collectionStage = 
     CompletableFuture.completedFuture(
      new LinkedList<>() 
     ); 

    for (Request request : requests) { 
     CompletionStage<Result> resultStage = performRequest(request); 
     collectionStage = collectionStage.thenCombine(
      resultStage, 
      (collection, result) -> { 
       collection.add(result); 
       return collection; 
      } 
     ); 
    } 

    return collectionStage; 

Ví dụ này có thể dễ dàng được chuyển thành chức năngĐể không mất khả năng đọc. Tuy nhiên, sử dụng số reduce hoặc collect của luồng yêu cầu mã bổ sung không tốt.

Cập nhật: CompletableFuture.allOfCompletableFuture.join cung cấp cách khác, dễ đọc hơn để chuyển đổi tập hợp các kết quả trong tương lai thành tập hợp kết quả trong tương lai.

Các vấn đề liên quan