Điều bạn mô tả tất nhiên là hàng đợi ưu tiên.
Rx là tất cả về luồng sự kiện, thay vì hàng đợi. Tất nhiên, hàng đợi là được sử dụng nhiều trong Rx - nhưng chúng không phải là khái niệm hạng nhất, phần chi tiết hơn về chi tiết triển khai của các khái niệm của Rx.
Một ví dụ tốt về nơi chúng tôi cần hàng đợi là đối phó với người quan sát chậm. Các sự kiện được gửi đi tuần tự trong Rx và nếu các sự kiện đến nhanh hơn người quan sát có thể xử lý chúng, thì chúng phải được xếp hàng đợi với người quan sát đó. Nếu có nhiều người quan sát, thì nhiều hàng đợi logic phải được duy trì, vì các nhà quan sát có thể tiến triển ở các bước khác nhau - và Rx chọn không giữ chúng trong bước khóa.
"Áp suất ngược" là khái niệm về các nhà quan sát cung cấp phản hồi cho các quan sát để cho phép các cơ chế xử lý áp suất của một quan sát nhanh hơn - chẳng hạn như giảm phát hoặc điều chỉnh. Rx không có cách thức đầu tiên để giới thiệu áp suất ngược - chỉ được xây dựng trong có nghĩa là một quan sát viên quan sát có thể quan sát được thông qua bản chất đồng bộ của OnNext
. Bất kỳ cơ chế nào khác sẽ cần phải nằm ngoài băng tần. Câu hỏi của bạn liên quan trực tiếp đến áp lực ngược, vì nó chỉ có liên quan trong trường hợp người quan sát chậm.
Tôi đề cập đến tất cả những điều này để cung cấp bằng chứng cho tuyên bố của tôi rằng Rx không phải là lựa chọn tuyệt vời để cung cấp loại công văn ưu tiên mà bạn đang tìm kiếm - thực sự, cơ chế xếp hàng đầu tiên có vẻ phù hợp hơn.
Để giải quyết vấn đề trong tầm tay, bạn cần tự quản lý xếp hàng ưu tiên, trong toán tử tùy chỉnh. Để giải quyết vấn đề: những gì bạn đang nói là nếu các sự kiện đến trong quá trình xử lý sự kiện OnNext
, chẳng hạn như có sự kiện để gửi, thì thay vì hàng đợi FIFO điển hình mà Rx sử dụng, bạn muốn công văn dựa trên một số ưu tiên.
Điều cần lưu ý là tinh thần Rx không giữ nhiều người quan sát trong bước khóa, các nhà quan sát đồng thời sẽ có khả năng thấy các sự kiện theo một thứ tự khác, có thể hoặc không có vấn đề gì đối với bạn. Bạn có thể sử dụng một cơ chế như Publish
để có được sự nhất quán trật tự - nhưng có thể bạn không muốn làm điều này vì thời gian phân phối sự kiện sẽ nhận được khá khó đoán và không hiệu quả trong trường hợp đó.
tôi chắc chắn rằng có những cách tốt hơn để làm điều này, nhưng đây là một ví dụ về một giao ưu tiên hàng đợi dựa - bạn có thể mở rộng này để làm việc cho nhiều con suối và những ưu tiên (ưu tiên hoặc thậm chí mỗi sự kiện) sử dụng một thực hiện hàng đợi tốt hơn (chẳng hạn như hàng đợi ưu tiên dựa trên b-tree) nhưng tôi đã chọn để duy trì điều này khá đơn giản. Thậm chí sau đó, lưu ý số lượng đáng kể các mối quan tâm mà mã phải giải quyết, xung quanh việc xử lý lỗi, hoàn thành, v.v. - và tôi đã lựa chọn khi chúng được báo hiệu rằng chắc chắn có nhiều lựa chọn hợp lệ khác.
Tất cả trong tất cả, việc triển khai này chắc chắn đặt tôi khỏi ý tưởng sử dụng Rx cho điều này. Đó là đủ phức tạp mà có lẽ có lỗi ở đây anyway. Như tôi đã nói, có thể là mã gọn gàng cho việc này (đặc biệt là cho các nỗ lực tối thiểu Tôi đã đặt vào nó!), Nhưng khái niệm, tôi thấy không thoải mái với ý tưởng không phụ thuộc vào thực hiện:
public static class ObservableExtensions
{
public static IObservable<TSource> MergeWithLowPriorityStream<TSource>(
this IObservable<TSource> source,
IObservable<TSource> lowPriority,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<TSource>(o => {
// BufferBlock from TPL dataflow is used as it is
// handily awaitable. package: Microsoft.Tpl.Dataflow
var loQueue = new BufferBlock<TSource>();
var hiQueue = new BufferBlock<TSource>();
var errorQueue = new BufferBlock<Exception>();
var done = new TaskCompletionSource<int>();
int doneCount = 0;
Action incDone =() => {
var dc = Interlocked.Increment(ref doneCount);
if(dc == 2)
done.SetResult(0);
};
source.Subscribe(
x => hiQueue.Post(x),
e => errorQueue.Post(e),
incDone);
lowPriority.Subscribe(
x => loQueue.Post(x),
e => errorQueue.Post(e),
incDone);
return scheduler.ScheduleAsync(async(ctrl, ct) => {
while(!ct.IsCancellationRequested)
{
TSource nextItem;
if(hiQueue.TryReceive(out nextItem)
|| loQueue.TryReceive(out nextItem))
o.OnNext(nextItem);
else if(done.Task.IsCompleted)
{
o.OnCompleted();
return;
}
Exception error;
if(errorQueue.TryReceive(out error))
{
o.OnError(error);
return;
}
var hiAvailableAsync = hiQueue.OutputAvailableAsync(ct);
var loAvailableAsync = loQueue.OutputAvailableAsync(ct);
var errAvailableAsync =
errorQueue.OutputAvailableAsync(ct);
await Task.WhenAny(
hiAvailableAsync,
loAvailableAsync,
errAvailableAsync,
done.Task);
}
});
});
}
}
Và ví dụ về cách sử dụng:
void static Main()
{
var xs = Observable.Range(0, 3);
var ys = Observable.Range(10, 3);
var source = ys.MergeWithLowPriorityStream(xs);
source.Subscribe(Console.WriteLine,() => Console.WriteLine("Done"));
}
Trước tiên, điều này sẽ cho biết mức ưu tiên cao hơn của chúng.
Vì vậy, bàn tay quan sát "thấp" (bất kể đoạn mã mới này) là một giá trị. Mã này có ý nghĩa gì? Chờ mãi để xem liệu "cao" có thể quan sát được * bao giờ * tạo ra một giá trị khác không? Tôi đang đấu tranh để xem các ưu tiên có thể hoạt động như thế nào và chỉ tiếp tục quay lại sử dụng 'Hợp nhất' và bỏ qua các ưu tiên. –
@Damien_The_Unbeliever, ý tưởng về cơ bản là "Hợp nhất" nhưng với các mục ưu tiên cao bỏ qua phía trước. Điều đó có ý nghĩa? Tôi muốn xử lý cả hai nguồn mục trong cùng một cách, do đó Hợp nhất và đăng ký không biết về mức độ ưu tiên. Nhưng đó là thứ tự mà họ đến nơi mà tôi đang tập trung vào - hãy tưởng tượng tôi có 20 mục ưu tiên thấp sắp tới, và một mục ưu tiên cao mới được gửi đi. Tôi muốn điều đó được phát ra tiếp theo, không phải sau 20 mục khác. –
Nhưng bạn chỉ từng thấy một mục tại một thời điểm. Nếu ưu tiên "thấp" đã vượt qua bạn năm mục cho đến thời điểm này và * dự định * sẽ gửi cho bạn thêm 15 mục nữa, thì sao? Và bạn chắc chắn không có cách nào phát hiện ra thực tế đó hoàn toàn từ các quan sát được. –