var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
var zip = a.Zip(b, (x, y) => x + "-" + y);
zip.Subscribe(Console.WriteLine);
Prints
0-5
1-6
2-7
...Làm cách nào để nối nhiều chuỗi IObservable?
Thay vào đó, tôi muốn tham gia vào các giá trị giống hệt
5-5
6-6
7 - 7
8 - 8
...
Đây là ví dụ đơn giản về sự cố khi hợp nhất 100 chuỗi chuỗi không đồng bộ được sắp xếp. Nó là rất dễ dàng để tham gia hai IEnumerable, nhưng tôi không thể tìm thấy một cách để làm một cái gì đó như thế này trong Rx. Ý tưởng nào?
Tìm hiểu thêm về đầu vào và những gì tôi đang cố gắng đạt được. Về cơ bản, toàn bộ hệ thống là một đường ống thời gian thực với nhiều máy trạng thái (bộ tập hợp, bộ đệm, bộ lọc làm mịn, vv) được kết nối bằng mô hình kết nối ngã ba. RX có phù hợp để triển khai những thứ như vậy không? Mỗi đầu vào có thể được biểu diễn như
public struct DataPoint
{
public double Value;
public DateTimeOffset Timestamp;
}
Mỗi bit đầu vào của dữ liệu được timestamped khi đến, do đó tất cả các sự kiện được sắp xếp một cách tự nhiên bởi chủ chốt tham gia của họ (timestamp). Khi các sự kiện đi qua đường ống, họ được chia đôi và tham gia. Tham gia cần phải được tương quan bằng dấu thời gian và được áp dụng theo thứ tự được xác định trước. Ví dụ, tham gia (a, b, c, d) => tham gia (tham gia (tham gia (a, b), c), d).
Chỉnh sửa Dưới đây là những gì tôi có thể đưa ra một cách nhanh chóng. Hy vọng rằng có một giải pháp đơn giản hơn dựa trên các toán tử Rx hiện có.
static void Test()
{
var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
//var zip = a.Zip(b, (x, y) => x + "-" + y);
//zip.Subscribe(Console.WriteLine);
var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
joined.Subscribe(Console.WriteLine);
}
static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
return Observable.CreateWithDisposable<string>(o =>
{
Queue<int> a = new Queue<int>();
Queue<int> b = new Queue<int>();
object gate = new object();
left.Subscribe(x =>
{
lock (gate)
{
if (a.Count == 0 || a.Peek() < x)
a.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
right.Subscribe(x =>
{
lock (gate)
{
if (b.Count == 0 || b.Peek() < x)
b.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
return Disposable.Empty;
});
Khi được hỏi cùng một câu hỏi trên [forum rx] (http://social.msdn.microsoft.com/Forums/en-US/rx/thread/adbcd963- 0c83-4968-a1b2-1317d5e31ae5) –