Tôi đang cố gắng phát ra các sự kiện định kỳ (mỗi 150ms) mặc dù thượng nguồn quan sát được sẽ gửi sự kiện nhanh hơn.MissingBackpressureException ngay cả sau khi gọi trênBackpressureBlock()
Nhưng tôi nhận được MissingBackpressureException
mặc dù tôi đã kêu gọi onBackpressureBlock()
mã:
SerializedSubject<QuotationMarker, QuotationMarker> subject = new SerializedSubject<> (PublishSubject.create());
return subject
.subscribeOn(Schedulers.computation())
.doOnSubscribe(() -> {
NetworkRequestsManager.instance().queryQuotations(productId).subscribe(quotation -> {
Log.d(TAG, "new quotation " + quotation.hashCode());
NetworkRequestsManager.instance().getSeller(quotation.sellerId)
.subscribe(seller -> {
for (Outlet outlet : seller.outlets) {
if (outlet.latitude != null && outlet.longitude != null)
subject.onNext(new QuotationMarker(outlet, quotation.price));
}
},
error -> Log.fatalError(new RuntimeException(error)));
},
error -> Log.fatalError(new RuntimeException(error)));
})
.doOnError(throwable -> Log.fatalError(new RuntimeException(
"error response in subscribe after doOnSubscribe",
throwable)))
// combine with another observable that emits items regularly (every 100ms)
// so that a new event is received every 100ms :
// also, first event itself is delayed.
.zipWith(Observable.interval(150, TimeUnit.MILLISECONDS),
(seller, aLong) -> seller)
.onBackpressureBlock() // prevent zipWith Observer.interval from throwing MissingBackpressureException s
.doOnError(throwable -> Log.fatalError(new RuntimeException(
"error response after onBackpressureBlock()",
throwable))); // <-- error is thrown here
dấu vết:
05-06 00:38:25.532 28106-28166/com.instano.buyer W/System.err﹕ java.lang.RuntimeException: error response after onBackpressureBlock()
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.lambda$fetchQuotationMarkersForProduct$59(Quotations.java:67)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.access$lambda$5(Quotations.java)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations$$Lambda$8.call(Unknown Source)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.Observable$11.onError(Observable.java:4193)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:65)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.complete(OperatorOnBackpressureBlock.java:81)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.drain(BackpressureDrainManager.java:190)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.terminateAndDrain(BackpressureDrainManager.java:129)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.onError(OperatorOnBackpressureBlock.java:68)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:324)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:332)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.Scheduler$Worker$1.call(Scheduler.java:120)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:390)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.FutureTask.run(FutureTask.java:234)
05-06 00:38:25.582 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
05-06 00:38:25.592 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.lang.Thread.run(Thread.java:841)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ Caused by: rx.exceptions.MissingBackpressureException
05-06 00:38:25.612 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349)
05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:330)
05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.err﹕ ... 10 more
PS: Log.fatalError(err)
chỉ là wrapper của tôi xung quanh Android.util.Log.e(...)
EDIT
Sau nhiều lần dùng thử và lỗi, điều này đang trở thành wont fix
cho tôi. zipWith(Observable.interval...)
có vẻ như là thủ phạm và một lỗi khuôn khổ có thể xảy ra. Loại bỏ những dòng (và do đó tính năng phát xạ định kỳ của tôi) mã của tôi hoạt động. Tôi đang sử dụng một chủ đề có thể gọi onNext
từ các chủ đề khác nhau và sau đó tôi đang thực hiện các toán tử không thể xử lý trên nó.
Tôi đã thử điều đó.Vẫn như vậy – vedant1811
Bạn đã thử cả hai? Từ mô tả của bạn, thật khó để biết cái nào quá nhanh. – zsxwing