2017-01-24 16 views
5

Tôi đang gặp khó khăn khi tìm một ví dụ về làm thế nào để làm cho một nhà điều hành tùy chỉnh với RxJava 2. Tôi đã xem xét một vài phương pháp:Tạo Toán tử Tùy chỉnh trong RxJava2?

  1. Sử dụng Observable.create, và sau đó flatMap ing vào nó từ nguồn quan sát được. Tôi có thể làm việc này, nhưng nó không hoàn toàn đúng. Tôi sẽ tạo ra một hàm tĩnh mà tôi cung cấp nguồn Observable và sau đó là flatMap trên nguồn. Trong OnSubscribe, sau đó tôi khởi tạo một đối tượng mà tôi truyền phát đến, xử lý và quản lý Observable/Emitter (vì nó không tầm thường, và tôi muốn mọi thứ được đóng gói càng tốt).
  2. Tạo một ObservableOperator và cung cấp cho Observable.lift. Tôi không thể tìm thấy bất kỳ ví dụ về điều này cho RxJava 2. Tôi đã phải gỡ lỗi vào ví dụ của riêng tôi để đảm bảo rằng sự hiểu biết của tôi về thượng lưu và hạ nguồn là chính xác. Bởi vì tôi không thể tìm thấy bất kỳ ví dụ hoặc tài liệu về điều này cho RxJava 2 Tôi là một chút lo lắng tôi có thể vô tình làm một cái gì đó tôi không phải.
  3. Tạo loại Observable của riêng tôi. Điều này có vẻ là cách các toán tử cơ bản hoạt động, nhiều trong số đó mở rộng AbstractObservableWithUpstream. Mặc dù có rất nhiều thứ đang diễn ra ở đây, và có vẻ như dễ dàng bỏ sót một điều gì đó hoặc làm điều gì đó mà tôi không nên làm. Tôi không chắc mình có nên tiếp cận như thế này hay không. Tôi bước vào bản thân mình thông qua quá trình tinh thần, và nó có vẻ như nó có thể nhận được lông khá nhanh chóng.

Tôi sẽ tiếp tục với tùy chọn # 2, nhưng bạn nên hỏi phương pháp được hỗ trợ để thực hiện điều này là gì trong RxJava2 và cũng tìm hiểu xem có bất kỳ tài liệu hoặc ví dụ nào cho điều này không.

+0

Đối # 2, tôi nghi ngờ đó là cơ chế tương tự được sử dụng để tạo ra tất cả các nhà khai thác bản địa rx' ví dụ 'buffer',' window', v.v. Vì vậy, bạn có thể đi đến github và tìm mã nguồn cho những cái đó, để xem chúng được triển khai như thế nào. – Luciano

+0

@Luciano # 3 là cách tất cả các nhà khai thác bản địa rx được thực hiện. Như tôi đã đề cập ở trên từ việc nhìn vào mã nguồn, nó được lông thực sự nhanh chóng. Rất nhiều phương pháp trợ giúp được gọi ở khắp mọi nơi.'RxAssembly',' DisposableHelper', và chúng được sử dụng một cách chính xác. Nó chắc chắn có thể được thực hiện, nhưng tôi chỉ cần sao chép một định dạng. Tôi muốn chắc chắn rằng tôi hiểu những gì đang xảy ra trong mã của tôi. – spierce7

Trả lời

1

Các toán tử viết không được khuyến nghị cho người mới bắt đầu và có thể đạt được nhiều mẫu lưu lượng mong muốn thông qua các toán tử hiện có.

Bạn đã xem wiki của RxJava về writing operators for 2.x chưa? Tôi đề nghị đọc nó từ trên xuống dưới.

  1. sử dụng create() là có thể nhưng hầu hết mọi người sử dụng nó để phát ra các yếu tố của một List với một for-each vòng lặp, không nhận ra rằng Flowable.fromIterable nào đó.
  2. Chúng tôi giữ điểm mở rộng này mặc dù các nhà khai thác RxJava 2 không tự sử dụng lift(). Nếu bạn muốn tránh một số boilerplate với tùy chọn 3. sau đó bạn có thể thử this route.
  3. Đây là cách các nhà khai thác RxJava 2 được triển khai. AbstractObservableWithUpstream là một sự thuận tiện nhỏ và không cần thiết cho external implementors.
1

Điều này có thể giúp bạn. Tôi thực hiện toán tử RxJava2 để xử lý APiError. Tôi đã sử dụng nhà điều hành thang máy.

Xem ví dụ.

public final class ApiClient implements ApiClientInterface { 
    ... 
     @NonNull 
     @Override 
     public Observable<ActivateResponse> activate(String email, EmailData emailLinkData) { 
      return myApiService.activate(email, emailData) 
        .lift(getApiErrorTransformer()) 
        .subscribeOn(Schedulers.io()); 
     } 

     private <T>ApiErrorOperator<T> getApiErrorTransformer() { 
      return new ApiErrorOperator<>(gson, networkService); 
     } 

    } 

Và sau đó bạn có thể tìm thấy nhà điều hành tùy chỉnh

public final class ApiErrorOperator<T> implements ObservableOperator<T, T> { 
     private static final String TAG = "ApiErrorOperator"; 
     private final Gson gson; 
     private final NetworkService networkService; 

     public ApiErrorOperator(@NonNull Gson gson, @NonNull NetworkService networkService) { 
      this.gson = gson; 
      this.networkService = networkService; 
     } 

     @Override 
     public Observer<? super T> apply(Observer<? super T> observer) throws Exception { 
      return new Observer<T>() { 
       @Override 
       public void onSubscribe(Disposable d) { 
        observer.onSubscribe(d); 
       } 

       @Override 
       public void onNext(T value) { 
        observer.onNext(value); 
       } 

       @Override 
       public void onError(Throwable e) { 
        Log.e(TAG, "onError", e); 

       if (e instanceof HttpException) { 
         try { 
          HttpException error = (HttpException) e; 
          Response response = error.response(); 
          String errorBody = response.errorBody().string(); 

          ErrorResponse errorResponse = gson.fromJson(errorBody.trim(), ErrorResponse.class); 
          ApiException exception = new ApiException(errorResponse, response); 

          observer.onError(exception); 
         } catch (IOException exception) { 
          observer.onError(exception); 
         } 

        } else if (!networkService.isNetworkAvailable()) { 
         observer.onError(new NetworkException(ErrorResponse.builder() 
           .setErrorCode("") 
           .setDescription("No Network Connection Error") 
           .build())); 
        } else { 
         observer.onError(e); 
        } 
       } 

       @Override 
       public void onComplete() { 
        observer.onComplete(); 
       } 
      }; 
     } 
    } 
Các vấn đề liên quan