2017-10-19 17 views
5

Nói rằng tôi có một API đó, dựa trên một số tiêu chí truy vấn, sẽ tìm thấy hoặc xây dựng một widget:không đồng bộ, giá trị trả về composable cho dữ liệu vector/suối trong Java 9

Widget getMatchingWidget(WidgetCriteria c) throws Throwable 

The (đồng bộ) mã khách hàng vẻ như:

try { 
    Widget w = getMatchingWidget(criteria); 
    processWidget(w); 
} catch (Throwable t) { 
    handleError(t); 
} 

Bây giờ việc tìm kiếm hoặc xây dựng một tiện ích đắt tiền không thể đoán trước và tôi không muốn khách hàng chặn trong khi chờ đợi. Vì vậy, tôi thay đổi nó thành:

CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c) 

Khách hàng sau đó có thể viết một trong hai:

CompletableFuture<Widget> f = getMatchingWidget(criteria); 
f.thenAccept(this::processWidget) 
f.exceptionally(t -> { handleError(t); return null; }) 

hay:

getMatchingWidget(criteria).whenComplete((t, w) -> { 
    if (t != null) { handleError(t); } 
    else { processWidget(t); } 
}); 

Bây giờ, chúng ta hãy nói thay API đồng bộ có thể trở lại từ 0 đến n widget :

Stream<Widget> getMatchingWidgets(WidgetCriteria c) 

ngây thơ, tôi có thể viết:

CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c) 

Tuy nhiên, điều này không thực sự làm cho đoạn code non-blocking, nó chỉ đẩy chặn xung quanh - một trong hai Future khối cho đến khi tất cả các Widgets có sẵn, hoặc mã lặp lại trên các khối Stream đang chờ mỗi Widget. Những gì tôi muốn là cái gì đó sẽ cho tôi xử lý mỗi widget khi họ đến nơi, một cái gì đó như:

void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor) 

Nhưng điều này không cung cấp xử lý lỗi, và thậm chí nếu tôi bổ sung thêm một Consumer<Throwable> errorHandler, nó không cho tôi , ví dụ: soạn truy xuất tiện ích của tôi với các truy vấn khác hoặc chuyển đổi kết quả.

Vì vậy, tôi đang tìm một số vật liệu tổng hợp kết hợp các đặc tính của Stream (khả năng lặp, khả năng biến đổi) với các đặc tính của CompletableFuture (kết quả không đồng bộ và xử lý lỗi). (Và, trong khi chúng ta đang ở đó, áp lực ngược lại có thể tốt đẹp.)

Đây có phải là java.util.concurrent.Flow.Publisher không? An io.reactivex.Observable? Một cái gì đó phức tạp hơn? Một cái gì đó đơn giản hơn?

+2

Tôi có xu hướng đẩy mọi thứ vào hàng đợi và kéo mọi thứ ra khỏi hàng đợi. –

Trả lời

6

Trường hợp sử dụng của bạn rơi rất tự nhiên vào thế giới mà RxJava đang giải quyết. Nếu chúng ta có một quan sát được:

Observable<Widget> getMatchingWidgets(wc); 

sản xuất zero hoặc nhiều tiện ích dựa trên các tiêu chí, sau đó bạn có thể xử lý mỗi widget như nó xuất hiện sử dụng:

getMatchingWidgets(wc) 
    .subscribeOn(backgroundScheduler) 
    .subscribe(w -> processWidget(w), 
       error -> handleError(error)); 

Chuỗi thể quan sát được sẽ chạy trên backgroundScheduler , thường là trình bao bọc cho dịch vụ thi hành luồng-pool.Nếu bạn cần phải làm như xử lý cuối cùng của mỗi widget trong giao diện người dùng, bạn có thể sử dụng toán tử observeOn() để chuyển sang các Scheduler UI trước khi chế biến:

getMatchingWidgets(wc) 
    .subscribeOn(backgroundScheduler) 
    .observeOn(uiScheduler) 
    .subscribe(w -> processWidget(w), 
       error -> handleError(error)); 

Đối với tôi, sự sang trọng của cách tiếp cận RxJava là nó xử lý rất nhiều các loại hạt và bu lông quản lý đường ống một cách thông thạo. Nhìn vào chuỗi người quan sát đó, bạn biết chính xác những gì đang xảy ra và ở đâu.

Các vấn đề liên quan