QUAN TRỌNG: cho một mô tả về kết quả và một số chi tiết, xin vui lòng có một cái nhìn cũng đến câu trả lời của tôiRx vận hành chuỗi riêng biệt
tôi cần phải nhóm và lọc một chuỗi các đối tượng/sự kiện mà thường là sao chép, đệm chúng với một khoảng thời gian TimeSpan. Tôi cố gắng giải thích nó tốt hơn với loại sơ đồ bằng đá cẩm thạch:
X-X-X-X-X-Y-Y-Y-Z-Z-Z-Z-X-X-Y-Z-Z
sẽ tạo ra
X---Y---Z---X---Y---Z
trong đó X, Y và Z là loại sự kiện khác nhau, và '---' có nghĩa là khoảng thời gian. Ngoài ra, tôi cũng sẽ giống như để phân biệt bởi một tài sản quan trọng mà nó có sẵn trên tất cả các loại bởi vì họ có một lớp cơ sở chung:
X, Y, Z : A
và A chứa một khóa bất động sản. Sử dụng các ký hiệu Xá nghĩa X.Key = a, Một mẫu cuối cùng sẽ là:
X.a-X.b-X.a-Y.b-Y.c-Z.a-Z.a-Z.c-Z.b-Z.c
sẽ tạo ra
X.a-X.b---Y.b-Y.c-Z.a-Z.c-Z.b
Ai có thể giúp tôi đặt lại với nhau các nhà khai thác LINQ cần (có thể là DistinctUntilChanged và Buffer) để đạt được hành vi này? Cảm ơn
CẬP NHẬT 18.08.12:
theo yêu cầu, tôi cố gắng để đưa ra một lời giải thích tốt hơn. Chúng tôi có các thiết bị thu thập và gửi các sự kiện đến một dịch vụ web. Các thiết bị này có một logic cũ (và chúng ta không thể thay đổi nó do khả năng tương thích ngược) và chúng liên tục gửi một sự kiện cho đến khi chúng nhận được một sự thừa nhận; sau khi xác nhận, họ gửi sự kiện tiếp theo trong hàng đợi của họ, v.v. Sự kiện chứa địa chỉ mạng của thiết bị và một số thuộc tính khác phân biệt các sự kiện trong hàng đợi cho từng thiết bị. Một sự kiện trông như thế này:
class Event
{
public string NetworkAddress { get; }
public string EventCode { get; }
public string AdditionalAttribute { get; }
}
Mục tiêu là xử lý mỗi 5 giây các sự kiện phân biệt nhận được từ tất cả các thiết bị, lưu trữ thông tin trong cơ sở dữ liệu (đó là lý do tại sao chúng tôi không muốn làm điều đó theo lô) và gửi ack vào thiết bị. Hãy làm một ví dụ với chỉ hai thiết bị và một số sự kiện:
Device 'a':
Event 1 (a1): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'x'
Event 2 (a2): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'y'
Event 3 (a3): NetworkAddress = '1', EventCode = B, AdditionalAttribute = 'x'
Device 'b':
Event 1 (b1): NetworkAddress = '2', EventCode = A, AdditionalAttribute = 'y'
Event 2 (b2): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'x'
Event 3 (b3): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'y'
Event 4 (b4): NetworkAddress = '2', EventCode = C, AdditionalAttribute = 'x'
Pn are the operations done by our server, explained later
sơ đồ bằng đá cẩm thạch có thể xảy ra (dòng đầu vào + output stream):
Device 'a' : -[a1]-[a1]-[a1]----------------[a2]-[a2]-[a2]-[a3]-[a3]-[a3]-...
Device 'b' : ------[b1]-[b1]-[b2]-[b2]-[b2]------[b3]-[b3]-[b4]-[b4]-[b4]-...
Time : ------------[1s]-----------[2s]------------[3s]------------[4s]-
DB/acks (rx output) : ------------[P1]-----------[P2]------------[P3]------------[P4]-
P1: Server stores and acknowledges [a1] and [b1]
P2: " " " " [b2]
P3: " " " " [a2] and [b3]
P4: " " " " [a3] and [b4]
Cuối cùng tôi nghĩ rằng nó có lẽ là một sự kết hợp đơn giản của các toán tử cơ bản, nhưng tôi mới sử dụng Rx và tôi hơi bối rối vì dường như có rất nhiều toán tử (hoặc kết hợp các toán tử) để có được luồng đầu ra giống nhau.
Cập nhật 19.08.12:
Hãy ghi nhớ rằng mã này chạy trên một máy chủ và nó sẽ chạy trong nhiều ngày mà không rò rỉ bộ nhớ ... Tôi không chắc chắn về các hành vi của các đối tượng.Hiện tại, đối với mỗi sự kiện, tôi gọi một hoạt động push trên một dịch vụ, nó gọi OnNext của một Subject trên đầu trang mà tôi nên xây dựng truy vấn (nếu tôi không sai về việc sử dụng các đối tượng).
Cập nhật 20.08.12:
thực hiện, bao gồm kiểm tra xác nhận; đây là những gì tôi đã cố gắng và có vẻ như giống nhau được đề xuất bởi @yamen
public interface IEventService
{
// Persists the events
void Add(IEnumerable<Event> events);
}
public class Event
{
public string Description { get; set; }
}
/// <summary>
/// Implements the logic to handle events.
/// </summary>
public class EventManager : IDisposable
{
private static readonly TimeSpan EventHandlingPeriod = TimeSpan.FromSeconds(5);
private readonly Subject<EventMessage> subject = new Subject<EventMessage>();
private readonly IDisposable subscription;
private readonly object locker = new object();
private readonly IEventService eventService;
/// <summary>
/// Initializes a new instance of the <see cref="EventManager"/> class.
/// </summary>
/// <param name="scheduler">The scheduler.</param>
public EventManager(IEventService eventService, IScheduler scheduler)
{
this.eventService = eventService;
this.subscription = this.CreateQuery(scheduler);
}
/// <summary>
/// Pushes the event.
/// </summary>
/// <param name="eventMessage">The event message.</param>
public void PushEvent(EventMessage eventMessage)
{
Contract.Requires(eventMessage != null);
this.subject.OnNext(eventMessage);
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
this.Dispose(true);
}
private void Dispose(bool disposing)
{
if (disposing)
{
// Dispose unmanaged resources
}
this.subject.Dispose();
this.subscription.Dispose();
}
private IDisposable CreateQuery(IScheduler scheduler)
{
var buffered = this.subject
.DistinctUntilChanged(new EventComparer())
.Buffer(EventHandlingPeriod, scheduler);
var query = buffered
.Subscribe(this.HandleEvents);
return query;
}
private void HandleEvents(IList<EventMessage> eventMessages)
{
Contract.Requires(eventMessages != null);
var events = eventMessages.Select(this.SelectEvent);
this.eventService.Add(events);
}
private Event SelectEvent(EventMessage message)
{
return new Event { Description = "evaluated description" };
}
private class EventComparer : IEqualityComparer<EventMessage>
{
public bool Equals(EventMessage x, EventMessage y)
{
return x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute;
}
public int GetHashCode(EventMessage obj)
{
var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute);
return s.GetHashCode();
}
}
}
public class EventMessage
{
public string NetworkAddress { get; set; }
public byte EventCode { get; set; }
public byte Attribute { get; set; }
// Other properties
}
Và kiểm tra:
public void PushEventTest()
{
const string Address1 = "A:2.1.1";
const string Address2 = "A:2.1.2";
var eventServiceMock = new Mock<IEventService>();
var scheduler = new TestScheduler();
var target = new EventManager(eventServiceMock.Object, scheduler);
var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 };
var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
scheduler.Schedule(() => target.PushEvent(eventMessageA1));
scheduler.Schedule(TimeSpan.FromSeconds(1),() => target.PushEvent(eventMessageB1));
scheduler.Schedule(TimeSpan.FromSeconds(2),() => target.PushEvent(eventMessageA1));
scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks);
eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 2)), Times.Once());
scheduler.Schedule(TimeSpan.FromSeconds(3),() => target.PushEvent(eventMessageB1));
scheduler.AdvanceTo(TimeSpan.FromSeconds(11).Ticks);
eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 1)), Times.Once());
}
Ngoài ra, tôi nhận xét một lần nữa rằng nó thực sự quan trọng là các phần mềm có thể chạy trong nhiều ngày mà không cần vấn đề, xử lý hàng nghìn tin nhắn. Để làm cho nó rõ ràng: thử nghiệm không vượt qua với việc thực hiện hiện tại.
Trình tự cuối cùng trong câu hỏi của bạn 'X.a-X.b --- Y.b-Y.c-Z.a-Z.c-Z.b' chỉ thể hiện một' --- 'khoảng thời gian. Đó là đúng hay nên khoảng thời gian giữa mỗi giá trị? – Enigmativity
Sẽ rất hữu ích nếu bạn cung cấp sơ đồ đá cẩm thạch làm nguồn và mục tiêu 'để mở rộng' bên dưới nhau, hoặc cung cấp ví dụ 'thực' cho chúng tôi để giúp bạn. – yamen
Cảm ơn bạn đã nắm bắt Enigmativity, tôi sẽ sửa đầu ra @yamen Tôi sẽ thêm một ví dụ khác – fra