Nói rằng tôi có một API đó, dựa trên một số tiêu chí truy vấn, sẽ tìm thấy hoặc xây dựng một widget:không đồng bộ, giá trị trả về composable cho dữ liệu vector/suối trong Java 9
Widget getMatchingWidget(WidgetCriteria c) throws Throwable
The (đồng bộ) mã khách hàng vẻ như:
try {
Widget w = getMatchingWidget(criteria);
processWidget(w);
} catch (Throwable t) {
handleError(t);
}
Bây giờ việc tìm kiếm hoặc xây dựng một tiện ích đắt tiền không thể đoán trước và tôi không muốn khách hàng chặn trong khi chờ đợi. Vì vậy, tôi thay đổi nó thành:
CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c)
Khách hàng sau đó có thể viết một trong hai:
CompletableFuture<Widget> f = getMatchingWidget(criteria);
f.thenAccept(this::processWidget)
f.exceptionally(t -> { handleError(t); return null; })
hay:
getMatchingWidget(criteria).whenComplete((t, w) -> {
if (t != null) { handleError(t); }
else { processWidget(t); }
});
Bây giờ, chúng ta hãy nói thay API đồng bộ có thể trở lại từ 0 đến n widget :
Stream<Widget> getMatchingWidgets(WidgetCriteria c)
ngây thơ, tôi có thể viết:
CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c)
Tuy nhiên, điều này không thực sự làm cho đoạn code non-blocking, nó chỉ đẩy chặn xung quanh - một trong hai Future
khối cho đến khi tất cả các Widgets
có sẵn, hoặc mã lặp lại trên các khối Stream
đang chờ mỗi Widget
. Những gì tôi muốn là cái gì đó sẽ cho tôi xử lý mỗi widget khi họ đến nơi, một cái gì đó như:
void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor)
Nhưng điều này không cung cấp xử lý lỗi, và thậm chí nếu tôi bổ sung thêm một Consumer<Throwable> errorHandler
, nó không cho tôi , ví dụ: soạn truy xuất tiện ích của tôi với các truy vấn khác hoặc chuyển đổi kết quả.
Vì vậy, tôi đang tìm một số vật liệu tổng hợp kết hợp các đặc tính của Stream
(khả năng lặp, khả năng biến đổi) với các đặc tính của CompletableFuture
(kết quả không đồng bộ và xử lý lỗi). (Và, trong khi chúng ta đang ở đó, áp lực ngược lại có thể tốt đẹp.)
Đây có phải là java.util.concurrent.Flow.Publisher không? An io.reactivex.Observable? Một cái gì đó phức tạp hơn? Một cái gì đó đơn giản hơn?
Tôi có xu hướng đẩy mọi thứ vào hàng đợi và kéo mọi thứ ra khỏi hàng đợi. –