2012-08-11 29 views
10

QUAN TRỌNG: cho một mô tả về kết quả và một số chi tiết, xin vui lòng có một cái nhìn cũng đến câu trả lời của tôiRx vận hành chuỗi riêng biệt

tôi cần phải nhóm và lọc một chuỗi các đối tượng/sự kiện mà thường là sao chép, đệm chúng với một khoảng thời gian TimeSpan. Tôi cố gắng giải thích nó tốt hơn với loại sơ đồ bằng đá cẩm thạch:

X-X-X-X-X-Y-Y-Y-Z-Z-Z-Z-X-X-Y-Z-Z 

sẽ tạo ra

X---Y---Z---X---Y---Z 

trong đó X, Y và Z là loại sự kiện khác nhau, và '---' có nghĩa là khoảng thời gian. Ngoài ra, tôi cũng sẽ giống như để phân biệt bởi một tài sản quan trọng mà nó có sẵn trên tất cả các loại bởi vì họ có một lớp cơ sở chung:

X, Y, Z : A 

và A chứa một khóa bất động sản. Sử dụng các ký hiệu Xá nghĩa X.Key = a, Một mẫu cuối cùng sẽ là:

X.a-X.b-X.a-Y.b-Y.c-Z.a-Z.a-Z.c-Z.b-Z.c 

sẽ tạo ra

X.a-X.b---Y.b-Y.c-Z.a-Z.c-Z.b 

Ai có thể giúp tôi đặt lại với nhau các nhà khai thác LINQ cần (có thể là DistinctUntilChanged và Buffer) để đạt được hành vi này? Cảm ơn

CẬP NHẬT 18.08.12:

theo yêu cầu, tôi cố gắng để đưa ra một lời giải thích tốt hơn. Chúng tôi có các thiết bị thu thập và gửi các sự kiện đến một dịch vụ web. Các thiết bị này có một logic cũ (và chúng ta không thể thay đổi nó do khả năng tương thích ngược) và chúng liên tục gửi một sự kiện cho đến khi chúng nhận được một sự thừa nhận; sau khi xác nhận, họ gửi sự kiện tiếp theo trong hàng đợi của họ, v.v. Sự kiện chứa địa chỉ mạng của thiết bị và một số thuộc tính khác phân biệt các sự kiện trong hàng đợi cho từng thiết bị. Một sự kiện trông như thế này:

class Event 
{ 
    public string NetworkAddress { get; } 

    public string EventCode { get; } 

    public string AdditionalAttribute { get; } 
} 

Mục tiêu là xử lý mỗi 5 giây các sự kiện phân biệt nhận được từ tất cả các thiết bị, lưu trữ thông tin trong cơ sở dữ liệu (đó là lý do tại sao chúng tôi không muốn làm điều đó theo lô) và gửi ack vào thiết bị. Hãy làm một ví dụ với chỉ hai thiết bị và một số sự kiện:

Device 'a': 
Event 1 (a1): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'x' 
Event 2 (a2): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'y' 
Event 3 (a3): NetworkAddress = '1', EventCode = B, AdditionalAttribute = 'x' 

Device 'b': 
Event 1 (b1): NetworkAddress = '2', EventCode = A, AdditionalAttribute = 'y' 
Event 2 (b2): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'x' 
Event 3 (b3): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'y' 
Event 4 (b4): NetworkAddress = '2', EventCode = C, AdditionalAttribute = 'x' 

Pn are the operations done by our server, explained later 

sơ đồ bằng đá cẩm thạch có thể xảy ra (dòng đầu vào + output stream):

Device 'a'   : -[a1]-[a1]-[a1]----------------[a2]-[a2]-[a2]-[a3]-[a3]-[a3]-... 
Device 'b'   : ------[b1]-[b1]-[b2]-[b2]-[b2]------[b3]-[b3]-[b4]-[b4]-[b4]-... 

Time    : ------------[1s]-----------[2s]------------[3s]------------[4s]- 
DB/acks (rx output) : ------------[P1]-----------[P2]------------[P3]------------[P4]- 

P1: Server stores and acknowledges [a1] and [b1] 
P2: "  "  " "   [b2] 
P3: "  "  " "   [a2] and [b3] 
P4: "  "  " "   [a3] and [b4] 

Cuối cùng tôi nghĩ rằng nó có lẽ là một sự kết hợp đơn giản của các toán tử cơ bản, nhưng tôi mới sử dụng Rx và tôi hơi bối rối vì dường như có rất nhiều toán tử (hoặc kết hợp các toán tử) để có được luồng đầu ra giống nhau.

Cập nhật 19.08.12:

Hãy ghi nhớ rằng mã này chạy trên một máy chủ và nó sẽ chạy trong nhiều ngày mà không rò rỉ bộ nhớ ... Tôi không chắc chắn về các hành vi của các đối tượng.Hiện tại, đối với mỗi sự kiện, tôi gọi một hoạt động push trên một dịch vụ, nó gọi OnNext của một Subject trên đầu trang mà tôi nên xây dựng truy vấn (nếu tôi không sai về việc sử dụng các đối tượng).

Cập nhật 20.08.12:

thực hiện, bao gồm kiểm tra xác nhận; đây là những gì tôi đã cố gắng và có vẻ như giống nhau được đề xuất bởi @yamen

public interface IEventService 
{ 
    // Persists the events 
    void Add(IEnumerable<Event> events); 
} 

public class Event 
{ 
    public string Description { get; set; } 
} 

/// <summary> 
/// Implements the logic to handle events. 
/// </summary> 
public class EventManager : IDisposable 
{ 
    private static readonly TimeSpan EventHandlingPeriod = TimeSpan.FromSeconds(5); 

    private readonly Subject<EventMessage> subject = new Subject<EventMessage>(); 

    private readonly IDisposable subscription; 

    private readonly object locker = new object(); 

    private readonly IEventService eventService; 

    /// <summary> 
    /// Initializes a new instance of the <see cref="EventManager"/> class. 
    /// </summary> 
    /// <param name="scheduler">The scheduler.</param> 
    public EventManager(IEventService eventService, IScheduler scheduler) 
    { 
     this.eventService = eventService; 
     this.subscription = this.CreateQuery(scheduler); 
    } 

    /// <summary> 
    /// Pushes the event. 
    /// </summary> 
    /// <param name="eventMessage">The event message.</param> 
    public void PushEvent(EventMessage eventMessage) 
    { 
     Contract.Requires(eventMessage != null); 
     this.subject.OnNext(eventMessage); 
    } 

    /// <summary> 
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. 
    /// </summary> 
    /// <filterpriority>2</filterpriority> 
    public void Dispose() 
    { 
     this.Dispose(true); 
    } 

    private void Dispose(bool disposing) 
    { 
     if (disposing) 
     { 
      // Dispose unmanaged resources 
     } 

     this.subject.Dispose(); 
     this.subscription.Dispose(); 
    } 

    private IDisposable CreateQuery(IScheduler scheduler) 
    { 
     var buffered = this.subject 
      .DistinctUntilChanged(new EventComparer()) 
      .Buffer(EventHandlingPeriod, scheduler); 

     var query = buffered 
      .Subscribe(this.HandleEvents); 
     return query; 
    } 

    private void HandleEvents(IList<EventMessage> eventMessages) 
    { 
     Contract.Requires(eventMessages != null); 
     var events = eventMessages.Select(this.SelectEvent); 
     this.eventService.Add(events); 
    } 

    private Event SelectEvent(EventMessage message) 
    { 
     return new Event { Description = "evaluated description" }; 
    } 

    private class EventComparer : IEqualityComparer<EventMessage> 
    { 
     public bool Equals(EventMessage x, EventMessage y) 
     { 
      return x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute; 
     } 

     public int GetHashCode(EventMessage obj) 
     { 
      var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute); 
      return s.GetHashCode(); 
     } 
    } 
} 

public class EventMessage 
{ 
    public string NetworkAddress { get; set; } 

    public byte EventCode { get; set; } 

    public byte Attribute { get; set; } 

    // Other properties 
} 

Và kiểm tra:

public void PushEventTest() 
    { 
     const string Address1 = "A:2.1.1"; 
     const string Address2 = "A:2.1.2"; 

     var eventServiceMock = new Mock<IEventService>(); 

     var scheduler = new TestScheduler(); 
     var target = new EventManager(eventServiceMock.Object, scheduler); 
     var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 }; 
     var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 }; 
     var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 }; 
     scheduler.Schedule(() => target.PushEvent(eventMessageA1)); 
     scheduler.Schedule(TimeSpan.FromSeconds(1),() => target.PushEvent(eventMessageB1)); 
     scheduler.Schedule(TimeSpan.FromSeconds(2),() => target.PushEvent(eventMessageA1)); 

     scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks); 

     eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 2)), Times.Once()); 

     scheduler.Schedule(TimeSpan.FromSeconds(3),() => target.PushEvent(eventMessageB1)); 

     scheduler.AdvanceTo(TimeSpan.FromSeconds(11).Ticks); 

     eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 1)), Times.Once()); 
    } 

Ngoài ra, tôi nhận xét một lần nữa rằng nó thực sự quan trọng là các phần mềm có thể chạy trong nhiều ngày mà không cần vấn đề, xử lý hàng nghìn tin nhắn. Để làm cho nó rõ ràng: thử nghiệm không vượt qua với việc thực hiện hiện tại.

+1

Trình tự cuối cùng trong câu hỏi của bạn 'X.a-X.b --- Y.b-Y.c-Z.a-Z.c-Z.b' chỉ thể hiện một' --- 'khoảng thời gian. Đó là đúng hay nên khoảng thời gian giữa mỗi giá trị? – Enigmativity

+0

Sẽ rất hữu ích nếu bạn cung cấp sơ đồ đá cẩm thạch làm nguồn và mục tiêu 'để mở rộng' bên dưới nhau, hoặc cung cấp ví dụ 'thực' cho chúng tôi để giúp bạn. – yamen

+0

Cảm ơn bạn đã nắm bắt Enigmativity, tôi sẽ sửa đầu ra @yamen Tôi sẽ thêm một ví dụ khác – fra

Trả lời

4

Tôi không chắc chắn nếu điều này thực hiện chính xác những gì bạn muốn, nhưng bạn có thể nhóm các yếu tố một cách rõ ràng bằng cách sử dụng từ khóa group, và sau đó để thao tác riêng biệt khác nhau IObservable s trước khi kết hợp lại chúng.

Ví dụ: nếu chúng ta có định nghĩa lớp như

class A 
{ 
    public char Key { get; set; } 
} 

class X : A { } 
... 

Subject<A>

Subject<A> subject = new Subject<A>(); 

sau đó chúng ta có thể viết

var buffered = 
    from a in subject 
    group a by new { Type = a.GetType(), Key = a.Key } into g 
    from buffer in g.Buffer(TimeSpan.FromMilliseconds(300)) 
    where buffer.Any() 
    select new 
    { 
     Count = buffer.Count, 
     Type = buffer.First().GetType().Name, 
     Key = buffer.First().Key 
    }; 

buffered.Do(Console.WriteLine).Subscribe(); 

Chúng ta có thể kiểm tra điều này với các dữ liệu bạn cung cấp:

subject.OnNext(new X { Key = 'a' }); 
Thread.Sleep(100); 
subject.OnNext(new X { Key = 'b' }); 
Thread.Sleep(100); 
subject.OnNext(new X { Key = 'a' }); 
Thread.Sleep(100); 
... 
subject.OnCompleted(); 

Đến ge t kết quả bạn đã cung cấp:

{ Count = 2, Type = X, Key = a } 
{ Count = 1, Type = X, Key = b } 
{ Count = 1, Type = Y, Key = b } 
{ Count = 1, Type = Y, Key = c } 
{ Count = 2, Type = Z, Key = a } 
{ Count = 2, Type = Z, Key = c } 
{ Count = 1, Type = Z, Key = b } 
+0

Thậm chí nếu không chính xác 100%, tôi chỉ định ở đây tiền thưởng vì tại thời điểm câu trả lời câu hỏi không đủ rõ ràng. Dù sao, câu trả lời là khá tốt lập luận. – fra

2

Không chắc chắn đây có phải là chính xác những gì bạn muốn, nhưng có vẻ như hỗ trợ các trường hợp sử dụng của bạn.

Trước tiên, hãy xác định các lớp cơ sở để sử dụng (bạn có thể dễ dàng sửa đổi này để phù hợp với nhu cầu của bạn):

public class MyEvent 
{ 
    public string NetworkAddress { set; get; } 
    public string EventCode { set; get; } 
} 

Hãy thiết lập thiết bị của bạn như là một mảng của IObservable<MyEvent> - bạn có thể có những có sẵn khác nhau, và dưới đây sẽ cần phải thay đổi để phù hợp với điều đó tất nhiên. Các thiết bị này sẽ tạo ra một giá trị với độ trễ ngẫu nhiên từ 0,5 đến 1,5 giây.

var deviceA = new MyEvent[] { new MyEvent() {NetworkAddress = "A", EventCode = "1"}, 
           new MyEvent() {NetworkAddress = "A", EventCode = "1"}, 
           new MyEvent() {NetworkAddress = "A", EventCode = "2"} }; 

var deviceB = new MyEvent[] { new MyEvent() {NetworkAddress = "B", EventCode = "1"}, 
           new MyEvent() {NetworkAddress = "B", EventCode = "2"}, 
           new MyEvent() {NetworkAddress = "B", EventCode = "2"}, 
           new MyEvent() {NetworkAddress = "B", EventCode = "3"} }; 

var random = new Random();         

var deviceARand = deviceA.ToObservable().Select(a => Observable.Return(a).Delay(TimeSpan.FromMilliseconds(random.Next(500,1500)))).Concat(); 
var deviceBRand = deviceB.ToObservable().Select(b => Observable.Return(b).Delay(TimeSpan.FromMilliseconds(random.Next(500,1500)))).Concat(); 

var devices = new IObservable<MyEvent>[] { deviceARand, deviceBRand }; 

Bây giờ chúng ta hãy xem tất cả các dòng thiết bị cá nhân, làm cho họ 'khác biệt', và ghép lại thành một dòng bậc thầy duy nhất:

var stream = devices.Aggregate(Observable.Empty<MyEvent>(), (acc, device) => acc.DistinctUntilChanged(a => a.EventCode).Merge(device)); 

Một khi bạn đã thấy, nhận được dòng này được tiêu thụ định kỳ chỉ là vấn đề của đệm nó với Buffer:

stream.Buffer(TimeSpan.FromSeconds(1)).Subscribe(x => { /* code here works on a list of the filtered events per second */ }); 
0

Sau khi tìm kiếm và các thí nghiệm, tôi đặt cùng một số mã mà sẽ cho kết quả mà tôi mong đợi:

static void Main(string[] args) 
    { 
     const string Address1 = "A:2.1.1"; 
     const string Address2 = "A:2.1.2"; 
     var comparer = new EventComparer(); 
     var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 }; 
     var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 }; 
     var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 5 }; 
     var list = new[] { eventMessageA1, eventMessageA1, eventMessageB1, eventMessageA2, eventMessageA1, eventMessageA1 }; 

     var queue = new BlockingCollection<EventMessage>(); 
     Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe 
      (
       l => list.ToList().ForEach(m => 
       { 
        Console.WriteLine("Producing {0} on thread {1}", m, Thread.CurrentThread.ManagedThreadId); 
        queue.Add(m); 
       }) 
      ); 

     // subscribing 
     queue.GetConsumingEnumerable() 
      .ToObservable() 
      .Buffer(TimeSpan.FromSeconds(5)) 
      .Subscribe(e => 
       { 
        Console.WriteLine("Queue contains {0} items", queue.Count); 
        e.Distinct(comparer).ToList().ForEach(m => 
        Console.WriteLine("{0} - Consuming: {1} (queue contains {2} items)", DateTime.UtcNow, m, queue.Count)); 
       } 
      ); 

     Console.WriteLine("Type enter to exit"); 
     Console.ReadLine(); 
    } 

    public class EventComparer : IEqualityComparer<EventMessage> 
    { 
     public bool Equals(EventMessage x, EventMessage y) 
     { 
      var result = x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute; 
      return result; 
     } 

     public int GetHashCode(EventMessage obj) 
     { 
      var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute); 
      return s.GetHashCode(); 
     } 
    } 

    public class EventMessage 
    { 
     public string NetworkAddress { get; set; } 

     public byte EventCode { get; set; } 

     public byte Attribute { get; set; } 

     public override string ToString() 
     { 
      const string Format = "{0} ({1}, {2})"; 
      var s = string.Format(Format, this.NetworkAddress, this.EventCode, this.Attribute); 
      return s; 
     } 
    } 

Dù sao, theo dõi ứng dụng, có vẻ như điều này gây ra rò rỉ bộ nhớ. Câu hỏi của tôi hiện là:

  • điều gì gây ra rò rỉ bộ nhớ? [Vui lòng xem nội dung cập nhật bên dưới]
  • là cách tốt nhất để làm điều đó (nếu tôi đặt sự khác biệt trên quan sát đầu tiên, tôi không nhận được các sự kiện khác trên bộ đệm tiếp theo. khác)?
  • làm cách nào để viết bài kiểm tra bằng cách sử dụng trình lên lịch kiểm tra?

CẬP NHẬT:

dường như tăng bộ nhớ chỉ kéo dài vài phút, sau đó giá trị ổn định. Tôi sẽ chạy một bài kiểm tra dài. Tất nhiên, đây sẽ là một hành vi hoàn toàn chấp nhận được.

CẬP NHẬT 26.08.12:

  • như tôi đã đề cập trong bản cập nhật trước đó, sự gia tăng sử dụng bộ nhớ chỉ (và chậm) cho một vài phút sau khi khởi động. Sau 8 giờ bộ nhớ tiêu thụ ổn định, với biến động bình thường trong khoảng vài KB)
  • question này rất giống với tôi và phần mở rộng Xả gợi ý có thể áp dụng tốt cho vấn đề của tôi (vẫn còn để được xác nhận)

Dù sao, tôi nghĩ rằng câu hỏi của tôi vẫn mở cho kiểm tra đơn vị bằng cách sử dụng trình kiểm tra lịch biểu.

nhờ Francesco

Các vấn đề liên quan