2015-05-05 23 views
6

Tôi đang cố gắng phát ra các sự kiện định kỳ (mỗi 150ms) mặc dù thượng nguồn quan sát được sẽ gửi sự kiện nhanh hơn.MissingBackpressureException ngay cả sau khi gọi trênBackpressureBlock()

Nhưng tôi nhận được MissingBackpressureException mặc dù tôi đã kêu gọi onBackpressureBlock()

mã:

SerializedSubject<QuotationMarker, QuotationMarker> subject = new SerializedSubject<> (PublishSubject.create()); 

    return subject 
      .subscribeOn(Schedulers.computation()) 
      .doOnSubscribe(() -> { 
       NetworkRequestsManager.instance().queryQuotations(productId).subscribe(quotation -> { 
          Log.d(TAG, "new quotation " + quotation.hashCode()); 
          NetworkRequestsManager.instance().getSeller(quotation.sellerId) 
            .subscribe(seller -> { 
               for (Outlet outlet : seller.outlets) { 
                if (outlet.latitude != null && outlet.longitude != null) 
                 subject.onNext(new QuotationMarker(outlet, quotation.price)); 
               } 
              }, 
              error -> Log.fatalError(new RuntimeException(error))); 
         }, 
         error -> Log.fatalError(new RuntimeException(error))); 

      }) 
      .doOnError(throwable -> Log.fatalError(new RuntimeException(
        "error response in subscribe after doOnSubscribe", 
        throwable))) 
        // combine with another observable that emits items regularly (every 100ms) 
        // so that a new event is received every 100ms : 
        // also, first event itself is delayed. 
      .zipWith(Observable.interval(150, TimeUnit.MILLISECONDS), 
        (seller, aLong) -> seller) 
      .onBackpressureBlock() // prevent zipWith Observer.interval from throwing MissingBackpressureException s 
      .doOnError(throwable -> Log.fatalError(new RuntimeException(
        "error response after onBackpressureBlock()", 
        throwable))); // <-- error is thrown here 

dấu vết:

05-06 00:38:25.532 28106-28166/com.instano.buyer W/System.err﹕ java.lang.RuntimeException: error response after onBackpressureBlock() 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.lambda$fetchQuotationMarkersForProduct$59(Quotations.java:67) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.access$lambda$5(Quotations.java) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations$$Lambda$8.call(Unknown Source) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.Observable$11.onError(Observable.java:4193) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:65) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.complete(OperatorOnBackpressureBlock.java:81) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.drain(BackpressureDrainManager.java:190) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.terminateAndDrain(BackpressureDrainManager.java:129) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.onError(OperatorOnBackpressureBlock.java:68) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:324) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:332) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.Scheduler$Worker$1.call(Scheduler.java:120) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:390) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.FutureTask.run(FutureTask.java:234) 
    05-06 00:38:25.582 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153) 
    05-06 00:38:25.592 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267) 
    05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080) 
    05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573) 
    05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.lang.Thread.run(Thread.java:841) 
    05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ Caused by: rx.exceptions.MissingBackpressureException 
    05-06 00:38:25.612 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349) 
    05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:330) 
    05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.err﹕ ... 10 more 

PS: Log.fatalError(err) chỉ là wrapper của tôi xung quanh Android.util.Log.e(...)

EDIT

Sau nhiều lần dùng thử và lỗi, điều này đang trở thành wont fix cho tôi. zipWith(Observable.interval...) có vẻ như là thủ phạm và một lỗi khuôn khổ có thể xảy ra. Loại bỏ những dòng (và do đó tính năng phát xạ định kỳ của tôi) mã của tôi hoạt động. Tôi đang sử dụng một chủ đề có thể gọi onNext từ các chủ đề khác nhau và sau đó tôi đang thực hiện các toán tử không thể xử lý trên nó.

Trả lời

2

Tôi nghĩ (nhưng tôi không chắc chắn) rằng vấn đề là cấu hình backpressure của bạn là sau khi các nhà điều hành zip.

Toán tử zip cần phải đệm các mục của một Observable để nén nó bằng một số khác Observable. Đây là bộ đệm này nên ném ngoại lệ của bạn. (xem here)

Để giải quyết vấn đề của bạn, tôi nghĩ bạn nên cố gắng thêm cấu hình áp đảo vào một (hoặc trên mỗi) Observable được sử dụng trong toán tử zip.

dụ:

obs.zipWith(Observable.interval(150, TimeUnit.MILLISECONDS).onBackPressureDrop()); 

obs.onBackPressureBlock().zipWith(Observable.interval(150, TimeUnit.MILLISECONDS)); 
+0

Tôi đã thử điều đó.Vẫn như vậy – vedant1811

+0

Bạn đã thử cả hai? Từ mô tả của bạn, thật khó để biết cái nào quá nhanh. – zsxwing

2

Câu trả lời trên từ @dwursteisen và @zsxwing là đúng.

Toán tử khoảng thời gian là toán tử phát ra dựa trên thời gian và do đó "nóng" và không hỗ trợ áp suất ngược. Do đó, nó sẽ tiếp tục phát ra và điền vào bộ đệm bên trong của zip gây ra MissingBackpressureException.

Khi xử lý nguồn "nóng" (chẳng hạn như dựa trên thời gian hoặc sự kiện của người dùng), bạn phải chọn chiến lược về cách xử lý tràn.

Trong trường hợp này, bạn cần phải đặt chiến lược đó trên toán tử interval.

Dưới đây là mã hiển thị những gì đang xảy ra và các tùy chọn để đối phó với nó:

import java.util.concurrent.TimeUnit; 

import rx.Observable; 


public class ZipInterval { 

    public static void main(String... args) { 
     Observable<Long> slowHotSource = Observable.interval(1, TimeUnit.SECONDS); 

     /** This one is fast and hot so will cause a MissingBackpressureException. 
     * 
     * This is because a "hot" source based on time does not obey backpressure 
     * and keeps emitting regardless of what the downstream asks for. 
     * 
     * Examples of "hot" and "cold" and approaches to both can be found at: 
     * https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=90 and 
     * https://github.com/ReactiveX/RxJava/wiki/Backpressure 
     * */ 
     // Observable<Long> fastHotSource = Observable.interval(1, TimeUnit.MILLISECONDS); 

     /** 
     * The following version of 'fastHotSource' composes a simple flow control strategy. 
     */ 
     Observable<Long> fastHotSource = Observable.interval(1, TimeUnit.MILLISECONDS).onBackpressureDrop(); 

     Observable<String> zipped = Observable.zip(slowHotSource, fastHotSource, (s, f) -> { 
      return s + " " + f; 
     }); 

     // subscribe to the output 
     System.out.println("---- zip output"); 
     zipped.take(10).toBlocking().forEach(System.out::println); 

     /** 
     * The outcome of the above is probably not what is expected though. 
     * 
     * This is because zip will buffer the output and then `fastHotSource` will drop until 
     * the zip buffer asks for more. 
     * 
     * For temporal or "hot" sources like this, using `withLatestFrom` or `combineLatest` 
     * is often more appropriate than `zip`. 
     */ 

     Observable<String> latest = slowHotSource.withLatestFrom(fastHotSource, (s, f) -> { 
      return s + " " + f; 
     }); 

     // subscribe to the output 
     System.out.println("---- latest output"); 
     latest.take(10).toBlocking().forEach(System.out::println); 
    } 
} 

Kết quả của việc này là:

---- zip output 
0 0 
1 1 
2 2 
3 3 
4 4 
5 5 
6 6 
7 7 
8 8 
9 9 
---- latest output 
0 1002 
1 2002 
2 3000 
3 4001 
4 5003 
5 6001 
6 7000 
7 8002 
8 9005 
9 10000 
0

Cố gắng sử dụng combineLatest vì kết hợp mới nhất không chờ mới các giá trị để gọi onNext, nó sử dụng các giá trị mới nhất khi một giá trị mới đến hàm

Các vấn đề liên quan