2016-06-06 19 views
6

Tôi đang tìm hiểu về điều hành RxJava, và tôi thấy những mã dưới đây không in bất cứ điều gì:Làm thế nào để sử dụng RxJava Interval hành

public static void main(String[] args) { 

    Observable 
    .interval(1, TimeUnit.SECONDS) 
    .subscribe(new Subscriber<Long>() { 
     @Override 
     public void onCompleted() { 
      System.out.println("onCompleted"); 
     } 

     @Override 
     public void onError(Throwable e) { 
      System.out.println("onError -> " + e.getMessage()); 
     } 

     @Override 
     public void onNext(Long l) { 
      System.out.println("onNext -> " + l); 
     } 
    }); 
} 

Như ReactiveX, interval

tạo Quan sát mà phát ra một dãy số nguyên được đặt cách nhau theo khoảng thời gian cụ thể

Tôi đã nhầm lẫn hoặc quên điều gì đó?

Trả lời

7

Bạn cần phải chặn cho đến khi có thể quan sát được tiêu thụ:

public static void main(String[] args) throws Exception { 

    CountDownLatch latch = new CountDownLatch(1); 

    Observable 
    .interval(1, TimeUnit.SECONDS) 
    .subscribe(new Subscriber<Long>() { 
     @Override 
     public void onCompleted() { 
      System.out.println("onCompleted"); 
      // make sure to complete only when observable is done 
      latch.countDown(); 
     } 

     @Override 
     public void onError(Throwable e) { 
      System.out.println("onError -> " + e.getMessage()); 
     } 

     @Override 
     public void onNext(Long l) { 
      System.out.println("onNext -> " + l); 
     } 
    }); 

    // wait for observable to complete (never in this case...) 
    latch.await(); 
} 

Bạn có thể thêm .take(10) ví dụ để xem toàn thể quan sát được.

+0

Tốt, một ví dụ rõ ràng về CountdownLatch để khởi động. – mtyson

+0

trong Rxjava 2, có cách nào để sử dụng khoảng thời gian với đơn? –

2

Đặt Thread.sleep(1000000) sau khi đăng ký và bạn sẽ thấy nó hoạt động. Observable.interval hoạt động theo mặc định trên Schedulers.computation() để luồng của bạn đang được chạy trên một chuỗi khác với chuỗi chính.

+0

Yeah, đó là chính xác. Interval operator chạy theo cách không đồng bộ, vì vậy tôi cần phải chặn abd wait để có được kết quả từ nó. Cảm ơn! –

0

Khi họ cho bạn biết khoảng thời gian hoạt động không đồng bộ, vì vậy bạn phải đợi cho tất cả các sự kiện kết thúc.

Bạn có thể nhận đăng ký sau khi đăng ký và sau đó sử dụng TestSubcriber là một phần của nền tảng reactiveX và sẽ cung cấp cho bạn tính năng chờ tất cả các sự kiện chấm dứt.

 @Test 
public void testObservableInterval() throws InterruptedException { 
    Subscription subscription = Observable.interval(1, TimeUnit.SECONDS) 
       .map(time-> "item emitted") 
       .subscribe(System.out::print, 
         item -> System.out.print("final:" + item)); 
    new TestSubscriber((Observer) subscription) 
      .awaitTerminalEvent(100, TimeUnit.MILLISECONDS); 
} 

tôi có trong github của tôi nhiều hơn ví dụ nếu bạn cần https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

Các vấn đề liên quan