2016-11-24 17 views
5

MỤC TIÊU:Quan sát dòng từ StackExchange Redis thuê bao Pub Sub

Tôi đang sử dụng StackExchange Redis Client. Mục tiêu của tôi là tạo luồng quan sát từ Người đăng ký phụ Pub được Khách hàng tiếp xúc, sau đó có thể hỗ trợ 1-n đăng ký theo Quan sát, mỗi người có bộ lọc riêng của họ qua LINQ. (Nhà xuất bản đang làm việc theo kế hoạch, vấn đề là hoàn toàn xung quanh Subscription để Stream tổ chức sự kiện trên một kênh cụ thể.)

BỐI CẢNH:

Tôi đang sử dụng Redis Pub Sub như là một phần của một ứng dụng tổ chức sự kiện nguồn gốc hay CQRS. Trường hợp sử dụng cụ thể là xuất bản Sự kiện cho nhiều người đăng ký, sau đó cập nhật các Mô hình đã đọc khác nhau, gửi email, v.v.

Mỗi người đăng ký này cần lọc loại sự kiện họ xử lý và tôi muốn sử dụng Rx .Net (Tiện ích mở rộng phản ứng), với LINQ, để cung cấp tiêu chí lọc trên Luồng sự kiện, để xử lý hiệu quả chỉ phản ứng với Sự kiện quan tâm. Sử dụng phương pháp này loại bỏ nhu cầu đăng ký Trình xử lý với việc triển khai xe buýt sự kiện và cho phép tôi thêm các dự đoán mới vào hệ thống bằng cách triển khai 1-n Các dịch vụ nhỏ có mỗi số 1-n Quan sát được đăng ký vào Luồng sự kiện. bộ lọc cụ thể.

GÌ Tôi đã thử:

1) Tôi đã tạo ra một lớp kế thừa từ ObservableBase, trọng phương pháp SubscribeCore, mà nhận được yêu cầu đăng ký từ quan sát, lưu trữ chúng trong một ConcurrentDictionary, và vì mỗi thông báo Redis đến từ kênh, lặp qua các thuê bao quan sát đã đăng ký và gọi phương thức OnNext của họ qua RedisValue.

2) Tôi đã tạo Chủ đề, cũng chấp nhận đăng ký từ Đài quan sát và gọi phương thức OnNext của chúng. Một lần nữa, việc sử dụng các chủ đề dường như được nhiều người tán thành.

SỰ CỐ:

Các phương pháp tôi đã cố gắng làm chức năng (ít nhất là bề ngoài), với mức độ khác nhau về hiệu suất, nhưng feel like a hack, và rằng tôi không sử dụng Rx trong cách thức mà nó được dự định.

Tôi thấy nhiều nhận xét rằng các phương pháp có thể quan sát được tích hợp nên được sử dụng ở tất cả, ví dụ Observable.FromEvent, nhưng dường như không thể thực hiện với API đăng ký của StackExchange Redis Clients, ít nhất là với đôi mắt của tôi .

Tôi cũng hiểu rằng các phương pháp ưa thích để nhận stream và chuyển tiếp để nhiều Các nhà quan sát là sử dụng một ConnectableObservable, điều này dường như được thiết kế cho các kịch bản rất tôi phải đối mặt (Mỗi Microservice sẽ nội bộ có 1 -n Quan sát được đăng ký). Hiện tại, tôi không thể tìm được cách kết nối một số ConnectableObservable với thông báo từ StackExchange Redis hoặc nếu nó cung cấp lợi ích thực tế trên Quan sát được.

CẬP NHẬT:

Mặc dù hoàn thành không phải là một vấn đề trong kịch bản của tôi (Xử lý được tốt), xử lý lỗi quan trọng; ví dụ. phát hiện lỗi cách ly trong một người đăng ký để ngăn tất cả các đăng ký chấm dứt.

Trả lời

8

Dưới đây là một phương pháp mở rộng mà bạn có thể sử dụng để tạo ra một IObservable<RedisValue> từ một ISubscriberRedisChannel:

public static IObservable<RedisValue> WhenMessageReceived(this ISubscriber subscriber, RedisChannel channel) 
{ 
    return Observable.Create<RedisValue>(async (obs, ct) => 
    { 
     await subscriber.SubscribeAsync(channel, (_, message) => 
     { 
      obs.OnNext(message); 
     }).ConfigureAwait(false); 

     return Disposable.Create(() => subscriber.Unsubscribe(channel)); 
    }); 
} 

Vì không có hoàn thành kênh Redis kết quả IObservable sẽ không bao giờ hoàn tất, tuy nhiên bạn có thể thả IDisposable đăng ký hủy đăng ký kênh Redis (điều này sẽ được thực hiện tự động bởi nhiều nhà khai thác Rx).

Cách sử dụng có thể là như vậy:

var subscriber = connectionMultiplexer.GetSubscriber(); 

var gotMessage = await subscriber.WhenMessageReceived("my_channel") 
    .AnyAsync(msg => msg == "expected_message") 
    .ToTask() 
    .ConfigureAwait(false); 

Hoặc theo ví dụ của bạn:

var subscriber = connectionMultiplexer.GetSubscriber(); 

var sendEmailEvents = subscriber.WhenMessageReceived("my_channel") 
    .Select(msg => ParseEventFromMessage(msg)) 
    .Where(evt => evt.Type == EventType.SendEmails); 

await sendEmailEvents.ForEachAsync(evt => 
{ 
    SendEmails(evt); 
}).ConfigureAwait(false); 

microservices khác có thể lọc khác nhau.

+0

Gọn gàng, tôi thích cách tiếp cận, +1. (Trước khi chấp nhận câu trả lời, tôi muốn chờ một vài ngày để cho phép người khác có cơ hội.) Bạn có thấy bất kỳ lợi thế nào trong việc sử dụng ConnectableObservable trên một Quan sát được không, như đã đề cập trong câu hỏi của tôi; hầu hết các nguồn mô tả các ConnectableObservable như được thiết kế cho một kịch bản mà một dòng được đọc, và sau đó xuất bản cho nhiều quan sát phụ. Mặc dù hoàn thành không phải là một vấn đề trong kịch bản của tôi (Xử lý là tốt), xử lý lỗi là quan trọng; cách ly chúng để ngăn tất cả các đăng ký chấm dứt. – dmcquiggin

+0

Nếu bạn muốn chia sẻ cùng một thuê bao với nhiều người quan sát thì bạn sẽ cần phải sử dụng một 'IConnectableObservable', có thể được thực hiện bằng cách sử dụng' .Publish() 'hoặc gọi một' khác .Multicast() 'hiện thực trên kết quả của 'WhenMessageReceived'. Nó không nên thay đổi cách 'WhenMessageReceived' được thực hiện ở tất cả. – Lukazoid

+0

Có, thử nghiệm và hiệu suất tuyệt vời với 'WhenMessageReceived' trả về một' IConnectableObservable' và 10 đăng ký đồng thời với 'IConnectableObservable' đó. Giải pháp tốt. – dmcquiggin

Các vấn đề liên quan