2014-11-14 23 views
6

Tôi có thể quan sát để tạo dữ liệu từ luồng nhanh từ một con trỏ cơ sở dữ liệu. Tôi đang tìm cách để tăng tốc đầu ra trên một tỷ lệ x mục mỗi giây. Cho đến nay tôi đã sử dụng callstack chặn như mô tả trên các tài liệu:Các quan sát giới hạn tốc độ

observable.map(f -> { 
ratelimiter.acquire(); // configured limiter to only allow 
}); 

này đang làm việc tốt, nhưng chỉ vì tò mò là có một cách tốt hơn để xử lý này sử dụng backpressure?

Tks

+0

Bạn có muốn 'delay' hoặc' throttleFirst (throttleLast) '? Sau đó sẽ thả các mục nếu nhận được các mục quá nhanh. – zsxwing

Trả lời

2

Bạn có thể thử sử dụng rx.Observable#onBackpressureBuffer() kết hợp với một thuê bao tùy chỉnh mà theo định kỳ sẽ yêu cầu n mục mỗi thứ hai. Tuy nhiên, bạn sẽ bị ràng buộc để cứng lấy mẫu thứ hai.

Lưu ý.subscribeOn().toBlocking() chỉ để làm cho phương pháp chính không thoát ngay lập tức.

public class BackpressureTest { 

    public static void main(final String[] args) { 
    Observable.range(1, 1000) 
     .compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it 
     .lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second 
     .subscribeOn(Schedulers.computation()) 
     .toBlocking() 
     .subscribe(System.out::println); 
    } 

    private static <T> Observable.Operator<T, T> allowPerSecond(final int n) { 
    return upstream -> periodicallyRequestingSubscriber(upstream, n); 
    } 

    private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) { 
    return new Subscriber<T>() { 

     @Override 
     public void onStart() { 
     request(0); // request 0 so that source stops emitting 
     Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items 
     } 

     @Override 
     public void onCompleted() { 
     upstream.onCompleted(); 
     } 

     @Override 
     public void onError(final Throwable e) { 
     upstream.onError(e); 
     } 

     @Override 
     public void onNext(final T integer) { 
     upstream.onNext(integer); 
     } 
    }; 
    } 
} 
0

Câu trả lời từ @michalsamek có vẻ đúng, mặc dù backpressure chỉ hoạt động cho Lưu lượng. Tôi đã sửa lại thuê bao của anh ta, để nó làm những gì được yêu cầu.

Cũng có một vấn đề nhỏ khi sử dụng nó trong các vụ nổ tại các thời điểm khác nhau.

private static <T> FlowableOperator<T, T> allowPerMillis(int millis) { 
    return observer -> new PeriodicallyRequestingSubscriber<>(observer, millis); 
} 


Observable.range(1, 100) 
    .observeOn(Schedulers.io()) 
    .toFlowable(BackpressureStrategy.BUFFER) 
    .compose(Flowable::onBackpressureBuffer) 
    .lift(allowPerMillis(200)) 
    .subscribe(value -> System.out.println(System.currentTimeMillis() % 10_000 + " : " + value)); 



public class PeriodicallyRequestingSubscriber<T> implements Subscriber<T> { 

    private final Subscriber<T> upstream; 

    private final int millis; 

    // If there hasn't been a request for a long time, do not flood 
    private final AtomicBoolean shouldRequest = new AtomicBoolean(true); 

    public PeriodicallyRequestingSubscriber(Subscriber<T> upstream, int millis) { 
     this.upstream = upstream; 
     this.millis = millis; 
    } 

    @Override 
    public void onSubscribe(Subscription subscription) { 
     Observable 
       .interval(millis, TimeUnit.MILLISECONDS) 
       .subscribe(x -> { 
        if (shouldRequest.getAndSet(false)) 
         subscription.request(1); 
       }); 
} 

@Override 
public void onNext(T t) { 
    shouldRequest.set(true); 
    upstream.onNext(t); 
} 

@Override 
public void onError(Throwable throwable) { 
    upstream.onError(throwable); 
} 

@Override 
public void onComplete() { 
    upstream.onComplete(); 
} 
} 
Các vấn đề liên quan