2015-05-05 12 views
6

Tôi đang cố gắng viết một chương trình đơn giản bằng cách sử dụng RxJava để tạo chuỗi số tự nhiên vô hạn. Vì vậy, đến nay tôi đã tìm thấy hai cách để tạo chuỗi các số bằng cách sử dụng Observable.timer()Observable.interval(). Tôi không chắc chắn nếu các chức năng này là đúng cách để tiếp cận vấn đề này. Tôi đã mong đợi một hàm đơn giản giống như chúng ta có trong Java 8 để tạo ra các số tự nhiên vô hạn.Tạo chuỗi số tự nhiên vô hạn bằng RxJava

IntStream.iterate (1, value -> value +1) .forEach (System.out :: println);

Tôi đã thử sử dụng IntStream với Observable nhưng điều đó không hoạt động chính xác. Nó gửi luồng vô hạn của các số chỉ cho người đăng ký đầu tiên. Làm thế nào tôi có thể tạo một chuỗi số tự nhiên vô hạn?

import rx.Observable; 
import rx.functions.Action1; 

import java.util.stream.IntStream; 

public class NaturalNumbers { 

    public static void main(String[] args) { 
     Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> { 
      IntStream stream = IntStream.iterate(1, val -> val + 1); 
      stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber)); 
     }); 

     Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber); 
     Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber); 
     Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber); 
     naturalNumbers.subscribe(first); 
     naturalNumbers.subscribe(second); 
     naturalNumbers.subscribe(third); 

    } 
} 

Trả lời

3

Vấn đề là về naturalNumbers.subscribe(first);, các OnSubscribe bạn thực hiện đang được gọi và bạn đang làm một forEach trên một dòng suối vô hạn, do vậy tại sao chương trình của bạn không bao giờ chấm dứt.

Một cách bạn có thể giải quyết là đăng ký chúng không đồng bộ trên một chuỗi khác. Để dễ dàng nhận thấy kết quả tôi đã giới thiệu một giấc ngủ vào chế biến Suối:

Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> { 
    IntStream stream = IntStream.iterate(1, i -> i + 1); 
    stream.peek(i -> { 
     try { 
      // Added to visibly see printing 
      Thread.sleep(50); 
     } catch (InterruptedException e) { 
     } 
    }).forEach(subscriber::onNext); 
}); 

final Subscription subscribe1 = naturalNumbers 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(first); 
final Subscription subscribe2 = naturalNumbers 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(second); 
final Subscription subscribe3 = naturalNumbers 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(third); 

Thread.sleep(1000); 

System.out.println("Unsubscribing"); 
subscribe1.unsubscribe(); 
subscribe2.unsubscribe(); 
subscribe3.unsubscribe(); 
Thread.sleep(1000); 
System.out.println("Stopping"); 
+0

Cảm ơn Mike cho câu trả lời của bạn. Nó sẽ là bất kỳ khác nhau nếu tôi gọi phương thức subscribeOn trong khi tạo Observable thay vì gọi nó ba lần như được hiển thị trong đoạn mã trên. Tôi đã thử nghiệm nó và hành vi là như nhau nhưng vẫn muốn xác nhận. – Shekhar

+0

Vấn đề này được xác định chính xác, nhưng đây là lời khuyên xấu - bạn không bao giờ nên sử dụng 'subscribeOn' để giải quyết vấn đề này - xem câu trả lời của tôi cho lý do. –

+1

Gọi 'hủy đăng ký' theo cách này ngắt kết nối thuê bao, vì vậy nó dừng nhận tin nhắn, nhưng nó không dừng vòng lặp của máy phát điện, điều này liên tục chạy vô hạn việc ăn năng lượng CPU của bạn. Xem câu trả lời của tôi về cách giải quyết cả hai mặt của câu chuyện. –

2

Observable.Generate là chính xác các nhà điều hành để giải quyết lớp này của vấn đề chủ động. Tôi cũng giả định đây là một ví dụ sư phạm, vì việc sử dụng lặp lại cho điều này có lẽ tốt hơn.

Mã của bạn tạo toàn bộ luồng trên chuỗi của người đăng ký. Vì đây là luồng vô hạn nên cuộc gọi subscribe sẽ không bao giờ hoàn thành. Ngoài vấn đề rõ ràng đó, việc hủy đăng ký cũng sẽ có vấn đề vì bạn không kiểm tra nó trong vòng lặp của mình.

Bạn muốn sử dụng bộ lập lịch để giải quyết vấn đề này - chắc chắn không sử dụng subscribeOn vì điều đó sẽ gây gánh nặng cho tất cả các nhà quan sát. Lập lịch phân phối từng số tới onNext - và là bước cuối cùng trong mỗi hành động được lên lịch, hãy lên lịch cho một số tiếp theo.

Về cơ bản đây là những gì Observable.generate cung cấp cho bạn - mỗi lần lặp được lên lịch trên trình lên lịch được cung cấp (mặc định là lịch trình giới thiệu đồng thời nếu bạn không chỉ định). Các hoạt động của trình lập lịch biểu có thể bị hủy và tránh sự đói chết của luồng.

Rx.NET giải quyết nó như thế này (trên thực tế có một mô hình async/await đó là tốt hơn, nhưng không có sẵn trong Java afaik):

static IObservable<int> Range(int start, int count, IScheduler scheduler) 
{ 
    return Observable.Create<int>(observer => 
    { 
     return scheduler.Schedule(0, (i, self) => 
     { 
      if (i < count) 
      { 
       Console.WriteLine("Iteration {0}", i); 
       observer.OnNext(start + i); 
       self(i + 1); 
      } 
      else 
      { 
       observer.OnCompleted(); 
      } 
     }); 
    }); 
} 

Hai điều cần lưu ý ở đây:

  • Cuộc gọi để Schedule trả về một handle handle được trả lại cho người quan sát
  • Schedule là đệ quy - tham số self là tham chiếu đến bộ lập lịch được sử dụng để gọi lặp tiếp theo. Điều này cho phép hủy đăng ký để hủy hoạt động.

Không chắc chắn điều này trông như thế nào trong RxJava, nhưng ý tưởng phải giống nhau.Một lần nữa, Observable.generate có lẽ sẽ đơn giản hơn cho bạn vì nó được thiết kế để xử lý kịch bản này.

1

Khi tạo sequencies vô hạn chăm sóc cần được thực hiện để:

  1. đăng ký và thực hiện trên chủ đề khác nhau; nếu không, bạn sẽ chỉ phục vụ người đăng ký duy nhất
  2. ngừng tạo các giá trị ngay sau khi đăng ký chấm dứt; nếu không vòng lặp chạy sẽ ăn CPU của bạn

Vấn đề đầu tiên được giải quyết bằng cách sử dụng subscribeOn(), observeOn() và nhiều bộ lập lịch khác nhau.

Vấn đề thứ hai được giải quyết tốt nhất bằng cách sử dụng các phương thức do thư viện cung cấp Observable.generate() hoặc Observable.fromIterable(). Họ kiểm tra thích hợp.

Kiểm tra này:

Observable<Integer> naturalNumbers = 
     Observable.<Integer, Integer>generate(() -> 1, (s, g) -> { 
      logger.info("generating {}", s); 
      g.onNext(s); 
      return s + 1; 
     }).subscribeOn(Schedulers.newThread()); 
Disposable sub1 = naturalNumbers 
     .subscribe(v -> logger.info("1 got {}", v)); 
Disposable sub2 = naturalNumbers 
     .subscribe(v -> logger.info("2 got {}", v)); 
Disposable sub3 = naturalNumbers 
     .subscribe(v -> logger.info("3 got {}", v)); 

Thread.sleep(100); 

logger.info("unsubscribing..."); 
sub1.dispose(); 
sub2.dispose(); 
sub3.dispose(); 
Thread.sleep(1000); 

logger.info("done"); 
Các vấn đề liên quan