Tôi muốn tạo quan sát mà sau:RXJava - làm cho một pausable thể quan sát được (với bộ đệm và cửa sổ chẳng hạn)
- đệm tất cả các mặt hàng, trong khi họ đang dừng
- ngay lập tức phát ra các mặt hàng, trong khi họ đang không bị tạm dừng
- trình kích hoạt tạm dừng/tiếp tục phải đến từ một số khác có thể quan sát được
- nó phải được lưu để sử dụng bởi các quan sát không chạy trên luồng chính và phải lưu thay đổi trạng thái tạm dừng/tiếp tục từ chuỗi chính
Tôi muốn sử dụng BehaviorSubject<Boolean>
làm trình kích hoạt và liên kết trình kích hoạt này với sự kiện onResume
và của hoạt động. (Mã số ví dụ nối)
Câu hỏi
tôi đã thiết lập một cái gì đó, nhưng nó không hoạt động như dự kiến. Tôi sử dụng nó như sau:
Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
Hiện nay vấn đề là, rằng Variant 1 nên hoạt động tốt, nhưng đôi khi, những sự kiện đó được không phát ra - van không được phát ra, cho đến khi tất cả mọi thứ van đang làm việc (có thể một vấn đề luồng ...)! Giải pháp 2 là đơn giản hơn nhiều và dường như làm việc, nhưng tôi không chắc chắn nếu nó thực sự là tốt hơn, tôi không nghĩ như vậy. Tôi thực sự không chắc chắn, tại sao giải pháp một là không đôi khi vì vậy tôi không chắc chắn nếu giải pháp 2 giải quyết vấn đề (hiện tại cho tôi chưa biết) ...
Ai đó có thể cho tôi biết điều gì có thể là vấn đề hoặc nếu giải pháp đơn giản nên hoạt động đáng tin cậy? Hay chỉ cho tôi một giải pháp đáng tin cậy?
Mã
RxValue
https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08
chức năng RXPauser
public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
return observable -> pauser(observable, pauser);
}
private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
// this observable buffers all items that are emitted while emission is paused
Observable<T> sharedSource = source.publish().refCount();
Observable<T> queue = sharedSource
.buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
.flatMap(l -> Observable.from(l))
.doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));
// this observable emits all items that are emitted while emission is not paused
Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
.switchMap(tObservable -> tObservable)
.doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));
// combine both observables
return queue.mergeWith(window)
.doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}
Hoạt động
public class BaseActivity extends AppCompatActivity {
private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);
public BaseActivity(Bundle savedInstanceState)
{
super(args);
final Class<?> clazz = this.getClass();
pauser
.doOnUnsubscribe(() -> {
L.d(clazz, "Pauser unsubscribed!");
})
.subscribe(aBoolean -> {
L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
});
}
public PublishSubject<Boolean> getPauser()
{
return pauser;
}
@Override
protected void onResume()
{
super.onResume();
pauser.onNext(true);
}
@Override
protected void onPause()
{
pauser.onNext(false);
super.onPause();
}
}
dân cố gắng để trả lời câu hỏi này được, cho đến nay, thiếu một yêu cầu quan trọng là được làm rất rõ ràng trong câu hỏi: _" trình kích hoạt tạm dừng/tiếp tục phải đến từ một "_. Họ không muốn có một lịch trình thời gian cố định. –