2014-06-18 21 views
7

Tôi đang cố gắng tạo mẫu bằng rxjava. Mẫu sẽ phối hợp một ReactiveWareService và một ReactiveReviewService truyền lại một hỗn hợp WareAndReview.Cách đợi async Có thể quan sát để hoàn thành

ReactiveWareService 
     public Observable<Ware> findWares() { 
     return Observable.from(wareService.findWares()); 
    } 

ReactiveReviewService: reviewService.findReviewsByItem does a ThreadSleep to simulate a latency! 

public Observable<Review> findReviewsByItem(final String item) { 
return Observable.create((Observable.OnSubscribe<Review>) observer -> executor.execute(() -> { 
    try { 
     List<Review> reviews = reviewService.findReviewsByItem(item); 
     reviews.forEach(observer::onNext); 
     observer.onCompleted(); 
    } catch (Exception e) { 
     observer.onError(e); 
    } 
})); 
} 

public List<WareAndReview> findWaresWithReviews() throws RuntimeException { 
final List<WareAndReview> wareAndReviews = new ArrayList<>(); 

wareService.findWares() 
    .map(WareAndReview::new) 
.subscribe(wr -> { 
     wareAndReviews.add(wr); 
     //Async!!!! 
     reviewService.findReviewsByItem(wr.getWare().getItem()) 
      .subscribe(wr::addReview, 
       throwable -> System.out.println("Error while trying to find reviews for " + wr) 
      ); 
    } 
); 

//TODO: There should be a better way to wait for async reviewService.findReviewsByItem completion! 
try { 
    Thread.sleep(3000); 
} catch (InterruptedException e) {} 

return wareAndReviews; 
} 

Với thực tế tôi không muốn trả lại một quan sát, làm cách nào tôi có thể đợi Async Observable (findReviewsByItem) để hoàn thành?

Trả lời

-3

Một cách khác là khai báo đếm ngược trước khi bắt đầu. Sau đó gọi countDown() trên chốt đó trong onCompleted() của bạn. Sau đó bạn có thể thay thế Thread.sleep() của bạn bằng cách chờ đợi() trên chốt đó.

12

Hầu hết các ví dụ của bạn có thể được viết lại với những hoạt động RxJava tiêu chuẩn mà làm việc cùng nhau tốt:

public class Example { 

    Scheduler scheduler = Schedulers.from(executor); 

    public Observable<Review> findReviewsByItem(final String item) { 
     return Observable.just(item) 
       .subscribeOn(scheduler) 
       .flatMapIterable(reviewService::findReviewsByItem); 
    } 
    public List<WareAndReview> findWaresWithReviews() { 
     return wareService 
       .findWares() 
       .map(WareAndReview::new) 
       .flatMap(wr -> { 
        return reviewService 
          .findReviewsByItem(wr.getWare().getItem()) 
          .doOnNext(wr::addReview) 
          .lastOrDefault(null) 
          .map(v -> wr); 
       }) 
       .toList() 
       .toBlocking() 
       .first(); 
    } 
} 

Bất cứ khi nào bạn muốn soạn dịch vụ như thế này, suy nghĩ của flatMap đầu tiên. Bạn không cần phải chặn cho mỗi tiểu quan sát nhưng chỉ ở cuối cùng với toBlocking() nếu thực sự cần thiết.

Các vấn đề liên quan