2016-01-22 24 views
5

Tôi thực sự thích RxJava, đó là một công cụ tuyệt vời nhưng đôi khi rất khó để hiểu cách hoạt động của nó. Chúng tôi sử dụng trang bị thêm với RxJava trong dự án Android của chúng tôi và có trường hợp sử dụng sau:Sử dụng "skipWhile" kết hợp với "repeatWhen" trong RxJava để thực hiện bỏ phiếu máy chủ

Tôi cần thăm dò ý kiến ​​của máy chủ, trong khi máy chủ đang thực hiện một số công việc. Khi máy chủ được thực hiện, tôi phải cung cấp kết quả. Vì vậy, tôi đã thực hiện thành công nó với RxJava, đây là đoạn mã: tôi đã sử dụng "skipWhile" với "repeatWhen"

Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob) 
     .skipWhile(new Func1<CheckJobResponse, Boolean>() { 

      @Override 
      public Boolean call(CheckJobResponse checkJobResponse) { 
       boolean shouldSkip = false; 

       if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus()); 

       switch (checkJobResponse.getJobStatus()){ 
        case CheckJobResponse.PROCESSING: 
         shouldSkip = true; 
         break; 
        case CheckJobResponse.DONE: 
        case CheckJobResponse.ERROR: 
         shouldSkip = false; 
         break; 
       } 
       if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip); 

       return shouldSkip; 
      } 
     }) 
     .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { 
      @Override 
      public Observable<?> call(Observable<? extends Void> observable) { 
       if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable); 
       return observable.delay(1, TimeUnit.SECONDS); 
      } 
     }).subscribe(new Subscriber<CheckJobResponse>(){ 
      @Override 
      public void onNext(CheckJobResponse response) { 
       if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response); 

      } 

      @Override 
      public void onError(BaseError error) { 
       if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error); 
       Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show(); 

      } 

      @Override 
      public void onCompleted() { 
       if (SHOW_LOGS) Logger.v(TAG, "onCompleted"); 
      } 
     }); 

Mã này hoạt động tốt:

Khi máy chủ trả lời công việc mà tôi đang xử lý trả về "true" từ chuỗi "skipWhile", Observable ban đầu chờ 1 giây và thực hiện lại yêu cầu http. Quá trình này lặp lại cho đến khi tôi trả về "false" từ chuỗi "skipWhile".

Dưới đây là một vài điều tôi không hiểu:

  1. tôi thấy trong tài liệu của "skipWhile" rằng nó sẽ không phát ra bất cứ điều gì (onerror, onNext, onComplete) từ gốc Quan sát cho đến khi tôi trở lại "sai" từ phương thức "gọi" của nó. Vì vậy, nếu nó không phát ra bất cứ điều gì tại sao "repeatWhen" Quan sát làm công việc của nó? Nó chờ một giây và chạy lại yêu cầu. Ai ra mắt nó?

  2. Câu hỏi thứ hai là: Tại sao có thể quan sát được từ "repeatWhen" không chạy mãi, tôi có nghĩa là tại sao nó dừng lặp lại khi tôi trả về "false" từ "skipWhile"? Tôi nhận được onNext thành công trong Đăng ký của tôi nếu tôi trả về "false".

  3. Trong tài liệu về "repeatWhile", cuối cùng tôi nhận được cuộc gọi tới "onComplete" trong Người đăng ký nhưng "onComplete" không bao giờ được gọi.

  4. Nó không có sự khác biệt nếu tôi thay đổi thứ tự chuỗi "skipWhile" và "repeatWhen". Tại sao vậy ?

Tôi hiểu rằng RxJava là nguồn mở và tôi chỉ có thể đọc mã, nhưng như tôi đã nói - thật khó hiểu.

Cảm ơn.

Trả lời

13

Tôi chưa từng làm việc với repeatWhen trước đây, nhưng câu hỏi này khiến tôi tò mò, vì vậy tôi đã thực hiện một số nghiên cứu.

skipWhilekhông phát ra onErroronCompleted, ngay cả khi nó không bao giờ trả true trước đó. Như vậy, repeatWhen đang được gọi mỗi lần checkJob() phát ra onCompleted. Điều đó trả lời câu hỏi # 1.

Phần còn lại của câu hỏi được dựa trên giả định sai. Đăng ký của bạn đang chạy vĩnh viễn vì repeatWhen của bạn không bao giờ chấm dứt. Đó là bởi vì repeatWhen là một con thú phức tạp hơn bạn nhận ra. Các Observable trong nó phát ra null bất cứ khi nào nó được onCompleted từ nguồn. Nếu bạn lấy nó và trả lại onCompleted thì nó kết thúc, ngược lại nếu bạn phát ra bất cứ thứ gì nó thử lại. Kể từ khi delay chỉ mất một phát thải và trì hoãn nó, nó luôn luôn phát ra các null một lần nữa.Như vậy, nó liên tục đăng ký lại.

Câu trả lời cho # 2, sau đó, là nó đang chạy mãi mãi; có thể bạn đang làm điều gì khác ngoài mã này để hủy đăng ký. Đối với # 3, bạn không bao giờ nhận được onCompleted vì nó không bao giờ hoàn thành. Đối với # 4, thứ tự không quan trọng vì bạn lặp lại vô thời hạn.

Câu hỏi bây giờ là, làm thế nào để bạn có được hành vi đúng? Nó đơn giản như sử dụng takeUntil thay vì skipWhile. Bằng cách đó, bạn tiếp tục lặp lại cho đến khi bạn nhận được kết quả mong muốn, do đó chấm dứt luồng khi bạn muốn kết thúc.

Dưới đây là một mẫu mã:

Observable<Boolean> source = ...; // Something that eventually emits true 

source 
    .repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS)) 
    .takeUntil(result -> result) 
    .filter(result -> result) 
    .subscribe(
     res -> System.out.println("onNext(" + res + ")"), 
     err -> System.out.println("onError()"), 
     () -> System.out.println("onCompleted()") 
    ); 

Trong ví dụ này, source được phát boolean. Tôi lặp lại sau mỗi 1 giây cho đến khi nguồn phát ra true. Tôi tiếp tục tham gia cho đến resulttrue. Và tôi lọc ra tất cả các thông báo là false, vì vậy người đăng ký không nhận được thông báo cho đến khi nó là true.

+0

Tôi đã đánh máy: Tôi thấy trong tài liệu "skipWhile" rằng nó sẽ không phát ra bất kỳ thứ gì (onError, onNext, onComplete) từ Original Observable cho đến khi tôi trả về "false" từ phương thức "call". Dưới đây là tài liệu gốc: Trả về một Quan sát có thể bỏ qua tất cả các mục được phát ra bởi nguồn Có thể quan sát được với điều kiện * đúng, nhưng phát ra tất cả các mục nguồn ngay sau khi điều kiện trở thành sai. Vì vậy, tôi nhận được "CHẾ BIẾN" lần đầu tiên và trả về "true" từ phương pháp. Nó không nên phát ra bất cứ thứ gì. Vậy tại sao lại thử lại? –

+0

Về # 2. Không, tôi chỉ có mã hủy đăng ký trong onStop của Hoạt động, nhưng không phải vậy. Tôi sẽ thấy rằng tôi vẫn đang bỏ phiếu cho máy chủ với các cuộc gọi API :) –

+0

Về nhận xét đầu tiên - phát thải của checkJob() có thể là 'onNext (CheckJobResponse)' -> 'onCompleted()'. Nó bỏ qua chuyển tiếp đầu tiên khi nó trả về 'PROCESSING', nhưng' onCompleted() 'vẫn đi qua. (Tôi đã xác nhận điều này bằng cách xem mã nguồn.) –

Các vấn đề liên quan