2011-02-06 36 views
7
 var a = Observable.Range(0, 10); 
     var b = Observable.Range(5, 10); 
     var zip = a.Zip(b, (x, y) => x + "-" + y); 
     zip.Subscribe(Console.WriteLine); 

Prints
0-5
1-6
2-7
...Làm cách nào để nối nhiều chuỗi IObservable?

Thay vào đó, tôi muốn tham gia vào các giá trị giống hệt
5-5
6-6
7 - 7
8 - 8
...

Đây là ví dụ đơn giản về sự cố khi hợp nhất 100 chuỗi chuỗi không đồng bộ được sắp xếp. Nó là rất dễ dàng để tham gia hai IEnumerable, nhưng tôi không thể tìm thấy một cách để làm một cái gì đó như thế này trong Rx. Ý tưởng nào?

Tìm hiểu thêm về đầu vào và những gì tôi đang cố gắng đạt được. Về cơ bản, toàn bộ hệ thống là một đường ống thời gian thực với nhiều máy trạng thái (bộ tập hợp, bộ đệm, bộ lọc làm mịn, vv) được kết nối bằng mô hình kết nối ngã ba. RX có phù hợp để triển khai những thứ như vậy không? Mỗi đầu vào có thể được biểu diễn như

public struct DataPoint 
{ 
    public double Value; 
    public DateTimeOffset Timestamp; 
} 

Mỗi bit đầu vào của dữ liệu được timestamped khi đến, do đó tất cả các sự kiện được sắp xếp một cách tự nhiên bởi chủ chốt tham gia của họ (timestamp). Khi các sự kiện đi qua đường ống, họ được chia đôi và tham gia. Tham gia cần phải được tương quan bằng dấu thời gian và được áp dụng theo thứ tự được xác định trước. Ví dụ, tham gia (a, b, c, d) => tham gia (tham gia (tham gia (a, b), c), d).

Chỉnh sửa Dưới đây là những gì tôi có thể đưa ra một cách nhanh chóng. Hy vọng rằng có một giải pháp đơn giản hơn dựa trên các toán tử Rx hiện có.

static void Test() 
    { 
     var a = Observable.Range(0, 10); 
     var b = Observable.Range(5, 10); 
     //var zip = a.Zip(b, (x, y) => x + "-" + y); 
     //zip.Subscribe(Console.WriteLine); 

     var joined = MergeJoin(a,b, (x,y) => x + "-" + y); 
     joined.Subscribe(Console.WriteLine); 
    } 

static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector) 
    { 
     return Observable.CreateWithDisposable<string>(o => 
      { 
       Queue<int> a = new Queue<int>(); 
       Queue<int> b = new Queue<int>(); 
       object gate = new object(); 

       left.Subscribe(x => 
        { 
         lock (gate) 
         { 
          if (a.Count == 0 || a.Peek() < x) 
           a.Enqueue(x); 

          while (a.Count != 0 && b.Count != 0) 
          { 
           if (a.Peek() == b.Peek()) 
           { 
            o.OnNext(selector(a.Dequeue(), b.Dequeue())); 
           } 
           else if (a.Peek() < b.Peek()) 
           { 
            a.Dequeue(); 
           } 
           else 
           { 
            b.Dequeue(); 
           } 
          } 
         } 
        }); 

       right.Subscribe(x => 
       { 
        lock (gate) 
        { 
         if (b.Count == 0 || b.Peek() < x) 
          b.Enqueue(x); 

         while (a.Count != 0 && b.Count != 0) 
         { 
          if (a.Peek() == b.Peek()) 
          { 
           o.OnNext(selector(a.Dequeue(), b.Dequeue())); 
          } 
          else if (a.Peek() < b.Peek()) 
          { 
           a.Dequeue(); 
          } 
          else 
          { 
           b.Dequeue(); 
          } 
         } 
        } 
       }); 

       return Disposable.Empty; 
      }); 
+0

Khi được hỏi cùng một câu hỏi trên [forum rx] (http://social.msdn.microsoft.com/Forums/en-US/rx/thread/adbcd963- 0c83-4968-a1b2-1317d5e31ae5) –

Trả lời

1

Tôi thành thật không thể nghĩ của một giải pháp dựa trên các toán tử hiện tại hoạt động cho các nguồn nóng không rõ thứ tự (nghĩa là, xs before ysys before xs). giải pháp của bạn có vẻ tốt đẹp (hey, nếu nó hoạt động), nhưng tôi muốn thực hiện một vài thay đổi nếu nó là mã của tôi:

  • Hỗ trợ hủy đúng cách sử dụng MutableDisposableCompositeDisposable
  • Gọi OnError cho trường hợp ngoại lệ ném từ bộ chọn (làm cho nó nhất quán hơn với những hoạt động khác)
  • Hãy xem xét hỗ trợ hoàn thành nếu nó có thể cho một nguồn để hoàn thành trước kia

Đoạn code dưới đây đã được thử nghiệm với đầu vào kép cấp của bạn, s đầu vào ame lộn, cũng như với Empty<int> + Never<int>:

public static IObservable<string> MergeJoin(
    IObservable<int> left, IObservable<int> right, Func<int, int, string> selector) 
{ 
    return Observable.CreateWithDisposable<string>(o => 
    { 
     Queue<int> a = new Queue<int>(); 
     Queue<int> b = new Queue<int>(); 
     object gate = new object(); 

     bool leftComplete = false; 
     bool rightComplete = false; 

     MutableDisposable leftSubscription = new MutableDisposable(); 
     MutableDisposable rightSubscription = new MutableDisposable(); 

     Action tryDequeue =() => 
     { 
      lock (gate) 
      { 
       while (a.Count != 0 && b.Count != 0) 
       { 
        if (a.Peek() == b.Peek()) 
        { 
         string value = null; 

         try 
         { 
          value = selector(a.Dequeue(), b.Dequeue()); 
         } 
         catch (Exception ex) 
         { 
          o.OnError(ex); 
          return; 
         } 

         o.OnNext(value); 
        } 
        else if (a.Peek() < b.Peek()) 
        { 
         a.Dequeue(); 
        } 
        else 
        { 
         b.Dequeue(); 
        } 
       } 
      } 
     }; 

     leftSubscription.Disposable = left.Subscribe(x => 
     { 
      lock (gate) 
      { 
       if (a.Count == 0 || a.Peek() < x) 
        a.Enqueue(x); 

       tryDequeue(); 

       if (rightComplete && b.Count == 0) 
       { 
        o.OnCompleted(); 
       } 
      } 
     },() => 
     { 
      leftComplete = true; 

      if (a.Count == 0 || rightComplete) 
      { 
       o.OnCompleted(); 
      } 
     }); 

     rightSubscription.Disposable = right.Subscribe(x => 
     { 
      lock (gate) 
      { 
       if (b.Count == 0 || b.Peek() < x) 
        b.Enqueue(x); 

       tryDequeue(); 

       if (rightComplete && b.Count == 0) 
       { 
        o.OnCompleted(); 
       } 
      } 
     },() => 
     { 
      rightComplete = true; 

      if (b.Count == 0 || leftComplete) 
      { 
       o.OnCompleted(); 
      } 
     }); 

     return new CompositeDisposable(leftSubscription, rightSubscription); 
    }); 
} 
3

GroupBy có thể làm những gì bạn cần. Có vẻ như bạn không có ràng buộc về thời gian khi các mục nhận được "tham gia", bạn chỉ cần các mục tương tự để được cùng nhau trong một số thời trang.

Observable.Merge(Observable.Range(1, 10), Observable.Range(5, 15)) 
.GroupBy(k => k) 
.Subscribe(go => go.Count().Where(cnt => cnt > 1) 
          .Subscribe(cnt => 
        Console.WriteLine("Key {0} has {1} matches", go.Key, cnt))); 

Hai điều cần lưu ý về việc trên, Merge có quá tải sau, do đó req của bạn để có hàng trăm con suối tham gia sẽ không trình bày một vấn đề:

Merge<TSource>(params IObservable<TSource>[] sources); 
Merge<TSource>(this IEnumerable<IObservable<TSource>> sources); 
Merge<TSource>(this IObservable<IObservable<TSource>> source); 

Bên cạnh đó, lợi nhuận GroupByIObservable<IGroupedObservable<TKey, TSource>> có nghĩa là bạn có thể phản ứng với từng nhóm và mỗi thành viên mới của mỗi nhóm khi họ tham gia - không cần phải đợi cho đến khi hoàn tất.

+0

Vấn đề duy nhất là tôi cần để có thể tham gia các giá trị theo thứ tự. Tuy nhiên nó có thể được giải quyết nếu thay vì int tôi vượt qua các giá trị index-tuple. –

+0

Ý bạn là gì theo "theo thứ tự"? –

+0

Hãy nhớ rằng bằng cách sử dụng 'Hợp nhất' +' Đếm', bạn sẽ không nhận được bất kỳ kết quả phù hợp nào cho đến khi cả hai chuỗi nguồn kết thúc. Điều này là tốt cho ví dụ 'Phạm vi', nhưng nếu nguồn của bạn là nóng/unending đầu ra có thể không được những gì bạn mong đợi. –

1

Cách sử dụng toán tử Join mới trong v.2838.

var a = Observable.Range(1, 10); 
var b = Observable.Range(5, 10); 

var joinedStream = a.Join(b, _ => Observable.Never<Unit>(), _ => Observable.Never<Unit>(), 
    (aOutput, bOutput) => new Tuple<int, int>(aOutput, bOutput)) 
    .Where(tupple => tupple.Item1 == tupple.Item2); 

joinedStream.Subscribe(output => Trace.WriteLine(output)); 

Đây là cái nhìn đầu tiên của tôi tại Join và tôi không chắc chắn nếu nó muốn được khôn ngoan để sử dụng toán tử Never như thế này. Khi đối phó với một khối lượng lớn các yếu tố đầu vào vì nó sẽ phát triển một số lượng lớn opertaions các đầu vào nhiều hơn đã được thu hồi. Tôi nghĩ rằng công việc có thể được thực hiện để đóng các cửa sổ như matche được thực hiện và làm cho giải pháp hiệu quả hơn. Điều đó nói rằng ví dụ trên hoạt động theo câu hỏi của bạn.

Để ghi lại, tôi nghĩ câu trả lời của Scott có lẽ là cách để thực hiện trong trường hợp này. Tôi chỉ ném cái này vào như một giải pháp thay thế tiềm năng.

+0

+1 cho giải pháp với tham gia. Tôi đã dành một giờ hôm qua và không thể làm cho nó hoạt động được. Tôi chia sẻ mối quan tâm của bạn về hiệu suất. Ngoài ra, các mã kết quả là khó hiểu hơn nhiều và khó làm theo so sánh với LINQ tham gia đơn giản. Tôi bắt đầu nghĩ rằng Rx không chỉ là một giải pháp tốt cho loại vấn đề này. –

+0

@Serger - Tôi chắc chắn điều này có thể được thực hiện hiệu quả hơn bằng cách phát ra các giá trị thời lượng khi các kết quả phù hợp được thực hiện (tức là thay thế Observable.Never với thông tin nào đó thông minh hơn một chút). Tất cả đều phụ thuộc vào quy tắc của bạn khi an toàn để hoàn thành thời gian. –

2

Câu trả lời này được sao chép từ các Rx forums, chỉ cần để nó sẽ được lưu trữ ở đây cũng như:

var xs = Observable.Range(1, 10); 
var ys = Observable.Range(5, 10); 

var joined = from x in xs 
    from y in ys 
    where x == y 
    select x + "-" + y; 

Hoặc mà không sử dụng biểu thức truy vấn:

var joined = 
    xs.SelectMany(x => ys, (x, y) => new {x, y}) 
    .Where(t => t.x == t.y) 
    .Select(t => t.x + "-" + t.y); 
+2

Vấn đề duy nhất với giải pháp này là nó yêu cầu 'ys' là nóng (hoặc' Multicast') và không hỗ trợ kịch bản mà giá trị 'ys' bật lên trước giá trị' xs'. –

Các vấn đề liên quan