Câu trả lời từ @michalsamek có vẻ đúng, mặc dù backpressure chỉ hoạt động cho Lưu lượng. Tôi đã sửa lại thuê bao của anh ta, để nó làm những gì được yêu cầu.
Cũng có một vấn đề nhỏ khi sử dụng nó trong các vụ nổ tại các thời điểm khác nhau.
private static <T> FlowableOperator<T, T> allowPerMillis(int millis) {
return observer -> new PeriodicallyRequestingSubscriber<>(observer, millis);
}
Observable.range(1, 100)
.observeOn(Schedulers.io())
.toFlowable(BackpressureStrategy.BUFFER)
.compose(Flowable::onBackpressureBuffer)
.lift(allowPerMillis(200))
.subscribe(value -> System.out.println(System.currentTimeMillis() % 10_000 + " : " + value));
public class PeriodicallyRequestingSubscriber<T> implements Subscriber<T> {
private final Subscriber<T> upstream;
private final int millis;
// If there hasn't been a request for a long time, do not flood
private final AtomicBoolean shouldRequest = new AtomicBoolean(true);
public PeriodicallyRequestingSubscriber(Subscriber<T> upstream, int millis) {
this.upstream = upstream;
this.millis = millis;
}
@Override
public void onSubscribe(Subscription subscription) {
Observable
.interval(millis, TimeUnit.MILLISECONDS)
.subscribe(x -> {
if (shouldRequest.getAndSet(false))
subscription.request(1);
});
}
@Override
public void onNext(T t) {
shouldRequest.set(true);
upstream.onNext(t);
}
@Override
public void onError(Throwable throwable) {
upstream.onError(throwable);
}
@Override
public void onComplete() {
upstream.onComplete();
}
}
Bạn có muốn 'delay' hoặc' throttleFirst (throttleLast) '? Sau đó sẽ thả các mục nếu nhận được các mục quá nhanh. – zsxwing