2011-01-19 31 views
7

Tôi muốn sử dụng Tiện ích mở rộng phản ứng để chuyển đổi một số thư và chuyển tiếp chúng sau một chút chậm trễ.Trì hoãn và hủy trùng lặp bằng cách sử dụng Tiện ích mở rộng phản ứng (Rx)

Các thông điệp giống như thế này:

class InMsg 
{ 
    int GroupId { get; set; } 
    int Delay { get; set; } 
    string Content { get; set; } 
} 

Kết quả trông giống như sau:

class OutMsg 
{ 
    int GroupId { get; set; } 
    string Content { get; set; } 
    OutMsg(InMsg in) 
    { 
     GroupId = in.GroupId; 
     Content = Transform(in.Content); // function omitted 
    } 
} 

Có một vài yêu cầu:

  • Chiều dài của sự chậm trễ phụ thuộc vào nội dung của tin nhắn.
  • Mỗi thư có một GroupId
  • Nếu thư mới đi kèm với cùng một GroupId như thư bị trì hoãn đang chờ truyền thì thư đầu tiên sẽ bị hủy và thư thứ hai được truyền sau một khoảng thời gian trễ mới.

Với một Quan sát <InMsg> và một Send chức năng:

IObservable<InMsg> inMsgs = ...; 

void Send(OutMsg o) 
{ 
    ... // publishes transformed messages 
} 

Tôi hiểu rằng tôi có thể sử dụng Select để thực hiện việc chuyển đổi.

void SetUp() 
{ 
    inMsgs.Select(i => new OutMsg(i)).Subscribe(Send); 
} 
  • Làm thế nào tôi có thể áp dụng một thông điệp rõ trì hoãn? (Lưu ý điều này có thể/nên dẫn đến việc không gửi thư theo thứ tự.)
  • Làm cách nào để loại bỏ các thư có cùng GroupId?
  • Rx có khả năng giải quyết vấn đề này không?
  • Có cách nào khác để giải quyết vấn đề này không?

Trả lời

7

Bạn có thể sử dụng GroupBy để thực hiện một IGroupedObservable, Delay để trì hoãn việc đầu ra, và Switch để đảm bảo giá trị mới thay thế giá trị trước đó trong nhóm của họ:

IObservable<InMsg> inMessages; 

inMessages 
    .GroupBy(msg => msg.GroupId) 
    .Select(group => 
     { 
      return group.Select(groupMsg => 
       { 
        TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay); 
        OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here 

        return Observable.Return(outMsg).Delay(delay); 
       }) 
       .Switch(); 
     }) 
     .Subscribe(outMsg => Console.Write("OutMsg received")); 

Một lưu ý về việc thực hiện: nếu một giá trị được nhóm đến sau thư được gửi (tức là sau khi trì hoãn), nó sẽ bắt đầu một sự chậm trễ mới

+0

Tôi đã chơi với điều này và nó không hoàn toàn làm những gì tôi mong đợi. Các thuê bao nhận được một "System.Collections.Generic.AnonymousObservable'1 [OutMsg]" – chillitom

+0

Có vẻ như bạn không gọi 'Switch'.Nếu bạn "di chuột qua" 'Chọn' trong Visual Studio, nó sẽ cho bạn biết rằng nó trả về một' IObservable '. Nếu nó trả về 'IObservable >', bạn không gọi Switch –

0

Khuôn khổ Rx giải quyết sự chậm trễ bằng cách sử dụngPhương pháp mở rộng 210. Việc xếp hàng với de-duplication có thể được giải quyết bằng cách áp dụng một loại LINQ thông thường sau Delay, sau đó thực hiện một DistinctUntilChanged.

Cập nhật: Tôi thừa nhận, cách tiếp cận chậm trễ ở đây sẽ không hoạt động một mình. Bạn bằng cách nào đó cần phải xếp hàng các tin nhắn đến trong sự chậm trễ mà bạn đang theo sau. Điều này được thực hiện theo phương pháp mở rộng BufferWithTime. Phương thức này sẽ trả về danh sách các thư, sau đó bạn có thể bóc vỏ các bản sao trước khi xuất bản chúng cho người quan sát tiếp theo trong dòng.

Các vấn đề liên quan