Tôi đã gặp vấn đề tương tự và không thể tìm thấy giải pháp đã sẵn sàng, do đó tôi đã tạo một giải pháp và tại đây. Ý tưởng là sử dụng BlockingCollection<T>
để thêm các mục cần xử lý và sử dụng Tiện ích mở rộng phản ứng để đăng ký với bộ xử lý có giới hạn tốc độ.
lớp Throttle là phiên bản đổi tên của this rate limiter
public static class BlockingCollectionExtensions
{
// TODO: devise a way to avoid problems if collection gets too big (produced faster than consumed)
public static IObservable<T> AsRateLimitedObservable<T>(this BlockingCollection<T> sequence, int items, TimeSpan timePeriod, CancellationToken producerToken)
{
Subject<T> subject = new Subject<T>();
// this is a dummyToken just so we can recreate the TokenSource
// which we will pass the proxy class so it can cancel the task
// on disposal
CancellationToken dummyToken = new CancellationToken();
CancellationTokenSource tokenSource = CancellationTokenSource.CreateLinkedTokenSource(producerToken, dummyToken);
var consumingTask = new Task(() =>
{
using (var throttle = new Throttle(items, timePeriod))
{
while (!sequence.IsCompleted)
{
try
{
T item = sequence.Take(producerToken);
throttle.WaitToProceed();
try
{
subject.OnNext(item);
}
catch (Exception ex)
{
subject.OnError(ex);
}
}
catch (OperationCanceledException)
{
break;
}
}
subject.OnCompleted();
}
}, TaskCreationOptions.LongRunning);
return new TaskAwareObservable<T>(subject, consumingTask, tokenSource);
}
private class TaskAwareObservable<T> : IObservable<T>, IDisposable
{
private readonly Task task;
private readonly Subject<T> subject;
private readonly CancellationTokenSource taskCancellationTokenSource;
public TaskAwareObservable(Subject<T> subject, Task task, CancellationTokenSource tokenSource)
{
this.task = task;
this.subject = subject;
this.taskCancellationTokenSource = tokenSource;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var disposable = subject.Subscribe(observer);
if (task.Status == TaskStatus.Created)
task.Start();
return disposable;
}
public void Dispose()
{
// cancel consumption and wait task to finish
taskCancellationTokenSource.Cancel();
task.Wait();
// dispose tokenSource and task
taskCancellationTokenSource.Dispose();
task.Dispose();
// dispose subject
subject.Dispose();
}
}
}
kiểm tra Đơn vị:
class BlockCollectionExtensionsTest
{
[Fact]
public void AsRateLimitedObservable()
{
const int maxItems = 1; // fix this to 1 to ease testing
TimeSpan during = TimeSpan.FromSeconds(1);
// populate collection
int[] items = new[] { 1, 2, 3, 4 };
BlockingCollection<int> collection = new BlockingCollection<int>();
foreach (var i in items) collection.Add(i);
collection.CompleteAdding();
IObservable<int> observable = collection.AsRateLimitedObservable(maxItems, during, CancellationToken.None);
BlockingCollection<int> processedItems = new BlockingCollection<int>();
ManualResetEvent completed = new ManualResetEvent(false);
DateTime last = DateTime.UtcNow;
observable
// this is so we'll receive exceptions
.ObserveOn(new SynchronizationContext())
.Subscribe(item =>
{
if (item == 1)
last = DateTime.UtcNow;
else
{
TimeSpan diff = (DateTime.UtcNow - last);
last = DateTime.UtcNow;
Assert.InRange(diff.TotalMilliseconds,
during.TotalMilliseconds - 30,
during.TotalMilliseconds + 30);
}
processedItems.Add(item);
},
() => completed.Set()
);
completed.WaitOne();
Assert.Equal(items, processedItems, new CollectionEqualityComparer<int>());
}
}
có điều gì đó không ổn với URL –