2016-09-25 22 views
16

Tôi muốn tạo quan sát mà sau:RXJava - làm cho một pausable thể quan sát được (với bộ đệm và cửa sổ chẳng hạn)

  • đệm tất cả các mặt hàng, trong khi họ đang dừng
  • ngay lập tức phát ra các mặt hàng, trong khi họ đang không bị tạm dừng
  • trình kích hoạt tạm dừng/tiếp tục phải đến từ một số khác có thể quan sát được
  • nó phải được lưu để sử dụng bởi các quan sát không chạy trên luồng chính và phải lưu thay đổi trạng thái tạm dừng/tiếp tục từ chuỗi chính

Tôi muốn sử dụng BehaviorSubject<Boolean> làm trình kích hoạt và liên kết trình kích hoạt này với sự kiện onResume và của hoạt động. (Mã số ví dụ nối)

Câu hỏi

tôi đã thiết lập một cái gì đó, nhưng nó không hoạt động như dự kiến. Tôi sử dụng nó như sau:

Observable o = ...; 
// Variant 1 
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue()) 
// Variant 2 
// o = o.compose(RXPauser.applyPauser(getPauser())); 
o 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(); 

Hiện nay vấn đề là, rằng Variant 1 nên hoạt động tốt, nhưng đôi khi, những sự kiện đó được không phát ra - van không được phát ra, cho đến khi tất cả mọi thứ van đang làm việc (có thể một vấn đề luồng ...)! Giải pháp 2 là đơn giản hơn nhiều và dường như làm việc, nhưng tôi không chắc chắn nếu nó thực sự là tốt hơn, tôi không nghĩ như vậy. Tôi thực sự không chắc chắn, tại sao giải pháp một là không đôi khi vì vậy tôi không chắc chắn nếu giải pháp 2 giải quyết vấn đề (hiện tại cho tôi chưa biết) ...

Ai đó có thể cho tôi biết điều gì có thể là vấn đề hoặc nếu giải pháp đơn giản nên hoạt động đáng tin cậy? Hay chỉ cho tôi một giải pháp đáng tin cậy?

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

chức năng RXPauser

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser) 
{ 
    return observable -> pauser(observable, pauser); 
} 

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser) 
{ 
    // this observable buffers all items that are emitted while emission is paused 
    Observable<T> sharedSource = source.publish().refCount(); 
    Observable<T> queue = sharedSource 
      .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed)) 
      .flatMap(l -> Observable.from(l)) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t)); 

    // this observable emits all items that are emitted while emission is not paused 
    Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed)) 
      .switchMap(tObservable -> tObservable) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t)); 

    // combine both observables 
    return queue.mergeWith(window) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t)); 
} 

Hoạt động

public class BaseActivity extends AppCompatActivity { 

    private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false); 

    public BaseActivity(Bundle savedInstanceState) 
    { 
     super(args); 
     final Class<?> clazz = this.getClass(); 
     pauser 
       .doOnUnsubscribe(() -> { 
        L.d(clazz, "Pauser unsubscribed!"); 
       }) 
       .subscribe(aBoolean -> { 
        L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED")); 
       }); 
    } 

    public PublishSubject<Boolean> getPauser() 
    { 
     return pauser; 
    } 

    @Override 
    protected void onResume() 
    { 
     super.onResume(); 
     pauser.onNext(true); 
    } 

    @Override 
    protected void onPause() 
    { 
     pauser.onNext(false); 
     super.onPause(); 
    } 
} 
+0

dân cố gắng để trả lời câu hỏi này được, cho đến nay, thiếu một yêu cầu quan trọng là được làm rất rõ ràng trong câu hỏi: _" trình kích hoạt tạm dừng/tiếp tục phải đến từ một "_. Họ không muốn có một lịch trình thời gian cố định. –

Trả lời

3

Bạn thực sự có thể sử dụng .buffer() hành đi qua nó quan sát, xác định khi nào nên dừng lại đệm, mẫu từ cuốn sách:

Observable.interval(100, TimeUnit.MILLISECONDS).take(10) 
    .buffer(Observable.interval(250, TimeUnit.MILLISECONDS)) 
    .subscribe(System.out::println); 

từ chương 5, 'Thuần hoá chuỗi': https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md

Bạn có thể sử dụng PublishSubject làm Observable để cấp dữ liệu cho các yếu tố trong toán tử tùy chỉnh của mình. Mỗi khi bạn cần bắt đầu lưu vào bộ đệm, hãy tạo cá thể bằng cách Observable.defer(() -> createBufferingValve())

2

Tôi đã thực hiện tương tự cho các sự kiện ghi nhật ký.
Chủ đề thu thập một số sự kiện và một lần trong 10 giây sẽ đẩy chúng vào máy chủ.

Ý tưởng chính là, ví dụ bạn có lớp Event.

public class Event { 

    public String jsonData; 

    public String getJsonData() { 
     return jsonData; 
    } 

    public Event setJsonData(String jsonData) { 
     this.jsonData = jsonData; 
     return this; 
    } 
} 

Bạn nên tạo hàng đợi cho các sự kiện:

private PublishSubject<Event> eventQueue = PublishSubject.create(); 

Nó có thể BehaviorSubject, nó không quan trọng

Sau đó, bạn nên tạo logic, mà sẽ xử lý đẩy các sự kiện đến máy chủ :

eventObservable = eventQueue.asObservable() 
      .buffer(10, TimeUnit.SECONDS) //flush events every 10 seconds 
      .toList() 
      .doOnNext(new Action1<List<Event>>() { 
       @Override 
       public void call(List<Event> events) { 
        apiClient.pushEvents(events);  //push your event 
       } 
      }) 
      .onErrorResumeNext(new Func1<Throwable, Observable<List<Event>>>() { 
       @Override 
       public Observable<List<Event>> call(Throwable throwable) { 
        return null; //make sure, that on error will be never called 
       } 
      }) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(Schedulers.io()); 

Sau đó, bạn nên đăng ký và giữ lại nhánh con ription, cho đến khi bạn không cần nó:

eventSubscription = eventObservable.subscribe() 

Home này giúp

Các vấn đề liên quan