2012-06-25 20 views
9

Gần đây tôi đã đọc về IObservable. Cho đến nay, tôi đã xem xét các câu hỏi SO khác nhau và xem video về những gì họ có thể làm. Toàn bộ cơ chế "đẩy" tôi nghĩ là rực rỡ, nhưng tôi vẫn đang cố gắng tìm ra chính xác mọi thứ. Từ các bài đọc của tôi, tôi đoán theo cách một số IObservable là thứ có thể được 'theo dõi' và IObservers là 'người theo dõi'.Sử dụng IObservable thay vì các sự kiện

Vì vậy, bây giờ tôi đang cố gắng thực hiện điều này trong ứng dụng của mình. Có một vài điều tôi muốn giải quyết trước khi tôi bắt đầu. Tôi đã thấy rằng IObservable là đối diện của IEnumerable, tuy nhiên, tôi không thể thực sự nhìn thấy bất kỳ nơi nào trong trường hợp cụ thể của tôi mà tôi có thể kết hợp vào ứng dụng của tôi.

Hiện tại, tôi sử dụng nhiều sự kiện, rất nhiều thứ mà tôi có thể thấy 'hệ thống ống nước' đang bắt đầu trở nên không thể quản lý được. Tôi nghĩ, IObservable có thể giúp tôi ở đây.

Hãy xem xét các thiết kế dưới đây, gọi là wrapper của tôi xung quanh tôi tôi/O trong ứng dụng của tôi (FYI, tôi thường phải đối phó với chuỗi):

Tôi có một giao diện cơ sở được gọi là IDataIO:

public interface IDataIO 
{ 
    event OnDataReceived; 
    event OnTimeout: 
    event OnTransmit; 
} 

Hiện tại, tôi hiện có ba lớp triển khai giao diện này, mỗi lớp trong một số cách sử dụng các cuộc gọi phương thức Async, giới thiệu một số loại xử lý đa luồng:

public class SerialIO : IDataIO; 
public class UdpIO : IDataIO; 
public class TcpIO : IDataIO; 

Có một trường hợp duy nhất của mỗi người trong các lớp học gói lên thành lớp cuối cùng của tôi, được gọi là IO (mà cũng thực hiện IDataIO - tôn trọng những mô hình chiến lược của tôi):

public class IO : IDataIO 
{ 
    public SerialIO Serial; 
    public UdpIO Udp; 
    public TcpIO Tcp; 
} 

Tôi đã sử dụng mô hình chiến lược để đóng gói các ba lớp, để khi thay đổi giữa các phiên bản IDataIO khác nhau trong thời gian chạy, làm cho nó 'ẩn' đối với người dùng cuối. Như bạn có thể tưởng tượng, điều này đã dẫn đến khá một chút 'sự kiện ống nước' trong nền.

Vì vậy, làm cách nào tôi có thể sử dụng thông báo 'đẩy' ở đây trong trường hợp của tôi? Thay vì đăng ký các sự kiện (DataReceived etc) tôi muốn chỉ đơn giản là đẩy dữ liệu đến bất cứ ai quan tâm. Tôi là một chút không chắc chắn về nơi để bắt đầu. Tôi vẫn đang cố gắng để đồ chơi với các ý tưởng/lớp học chung của Subject, và các hóa thân khác nhau này (ReplaySubject/AsynSubject/BehaviourSubject). Ai đó có thể xin vui lòng khai sáng cho tôi về điều này (có thể với tham chiếu đến thiết kế của tôi)? Hay đơn giản đây không phải là sự phù hợp lý tưởng cho IObservable?

PS. Vui lòng chỉnh sửa bất kỳ 'hiểu lầm' nào của tôi :)

Trả lời

8

Quan sát là tuyệt vời để biểu diễn luồng dữ liệu, vì vậy sự kiện DataReceived của bạn sẽ trở nên độc đáo với mẫu quan sát được, chẳng hạn như IObservable<byte> hoặc IObservable<byte[]>. Bạn cũng nhận được lợi ích bổ sung của OnErrorOnComplete tiện lợi.

Về mặt triển khai, thật khó để nói cho trường hợp chính xác của bạn nhưng chúng tôi thường sử dụng Subject<T> làm nguồn cơ bản và gọi OnNext để đẩy dữ liệu.Có lẽ cái gì đó như

// Using a subject is probably the easiest way to push data to an Observable 
// It wraps up both IObservable and IObserver so you almost never use IObserver directly 
private readonly Subject<byte> subject = new Subject<byte>(); 

private void OnPort_DataReceived(object sender, EventArgs e) 
{ 
    // This pushes the data to the IObserver, which is probably just a wrapper 
    // around your subscribe delegate is you're using the Rx extensions 
    this.subject.OnNext(port.Data); // pseudo code 
} 

Sau đó, bạn có thể tiếp xúc với các đối tượng thông qua một tài sản:

public IObservable<byte> DataObservable 
{ 
    get { return this.subject; } // Or this.subject.AsObservable(); 
} 

Bạn có thể thay thế sự kiện DataReceived của bạn trên IDataIO với một IObservable<T> và có mỗi lớp chiến lược xử lý dữ liệu của họ trong bất cứ cách họ cần và đẩy đến số Subject<T>.

Ở phía bên kia, bất cứ ai đăng ký vào các Quan sát là sau đó có thể hoặc là xử lý nó như một sự kiện (chỉ bằng cách sử dụng một Action<byte[]>) hoặc bạn có thể thực hiện một số công việc thực sự hữu ích trên dòng với Select, Where, Buffer, vv .

private IDataIO dataIo = new ... 

private void SubscribeToData() 
{ 
    dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes); 
} 

private void On16Bytes(IList<byte> bytes) 
{ 
    // do stuff 
} 

ReplaySubject/ConnectableObservable s là tuyệt vời khi bạn biết thuê bao của bạn sẽ được đến muộn để đảng nhưng vẫn cần phải bắt kịp trên tất cả các sự kiện. Các nguồn lưu trữ tất cả mọi thứ nó đẩy và replay tất cả mọi thứ cho mỗi thuê bao. Chỉ có bạn mới có thể nói rằng đó là hành vi bạn thực sự cần (nhưng hãy cẩn thận vì nó sẽ lưu trữ tất cả mọi thứ mà sẽ tăng sử dụng bộ nhớ của bạn rõ ràng).

Khi tôi đang tìm hiểu về Rx tôi thấy http://leecampbell.blogspot.co.uk/ loạt blog trên Rx là rất nhiều thông tin để hiểu được lý thuyết (các bài viết là một chút ngày nay và các API đã thay đổi vì vậy xem ra cho điều đó)

+0

Hi RichK, bạn có thể xin hãy giải thích về quyền sở hữu Chủ đề? Điều này được tuyên bố như thế nào? Và người dùng của lớp này, làm thế nào chính xác họ sẽ 'đăng ký' với IObservable 'DataReceived'. – Simon

+0

@Simon Tôi đã thực hiện một vài chỉnh sửa, cho tôi biết nếu bạn vẫn không chắc chắn :) – RichK

+0

Cảm ơn, điều đó sẽ xóa một vài điều. Chỉ cần 1 điều, tôi giả sử 'dataIo.DataObservable' là' public IObservable DataObservable'? – Simon

4

Đây là chắc chắn là một trường hợp lý tưởng cho các quan sát. Lớp học IO có thể sẽ được cải thiện nhiều nhất. Để bắt đầu, hãy thay đổi giao diện để sử dụng các quan sát và xem lớp kết hợp trở nên đơn giản như thế nào.

public interface IDataIO 
{ 
    //you will have to fill in the types here. Either the event args 
    //the events provide now or byte[] or something relevant would be good. 
    IObservable<???> DataReceived; 
    IObservable<???> Timeout; 
    IObservable<???> Transmit; 
} 

public class IO : IDataIO 
{ 
    public SerialIO Serial; 
    public UdpIO Udp; 
    public TcpIO Tcp; 

    public IObservable<???> DataReceived 
    { 
     get 
     { 
      return Observable.Merge(Serial.DataReceived, 
            Udp.DataReceived, 
            Tcp.DataReceived); 
     } 
    } 

    //similarly for other two observables 
} 

SIDE LƯU Ý: Bạn có thể nhận thấy rằng tôi đã thay đổi tên thành viên giao diện. Trong các sự kiện .NET thường có tên là <event name> và các hàm nâng cao chúng được gọi là On<event name>.

Đối với các lớp sản xuất, bạn có một vài tùy chọn phụ thuộc vào nguồn thực tế. Giả sử bạn đang sử dụng lớp .NET SerialPort trong số SerialIO và rằng DataReceived trả về một IObservable<byte[]>. Kể từ khi SerialPort đã có một sự kiện cho dữ liệu nhận được, bạn có thể sử dụng trực tiếp để làm cho các quan sát bạn cần.

public class SerialIO : IDataIO 
{ 
    private SerialPort _port; 

    public IObservable<byte[]> DataRecived 
    { 
     get 
     { 
      return Observable.FromEventPattern<SerialDataReceivedEventHandler, 
               SerialDataReceivedEventArgs>(
         h => _port.DataReceived += h, 
         h => _port.DataReceived -= h) 
        .Where(ep => ep.EventArgs.EventType == SerialData.Chars) 
        .Select(ep => 
          { 
           byte[] buffer = new byte[_port.BytesToRead]; 
           _port.Read(buffer, 0, buffer.Length); 
           return buffer; 
          }); 
     } 
    } 
} 

Đối với trường hợp bạn không có nguồn sự kiện hiện tại, bạn có thể cần phải sử dụng chủ đề như RichK đề xuất. Câu trả lời của anh ấy bao gồm mô hình sử dụng khá tốt, vì vậy tôi sẽ không lặp lại ở đây.

Bạn không hiển thị cách bạn sử dụng giao diện này, nhưng tùy thuộc vào trường hợp sử dụng, có thể có ý nghĩa hơn khi có các chức năng khác trên các lớp này tự trả lại và tự mình loại bỏ hoàn toàn các sự kiện này. Với mô hình async dựa trên sự kiện, bạn phải có các sự kiện riêng biệt với hàm bạn gọi để kích hoạt công việc, nhưng với các quan sát, bạn có thể trả về chúng từ hàm thay vì làm rõ hơn những gì bạn đang đăng ký. Cách tiếp cận đó cũng cho phép các quan sát được trả về từ mỗi cuộc gọi để gửi các thông báo OnErrorOnCompleted để báo hiệu kết thúc một thao tác. Dựa trên việc bạn sử dụng một lớp kết hợp, tôi không mong đợi điều này hữu ích trong trường hợp cụ thể này, nhưng đó là điều cần ghi nhớ.

+0

+1 Cảm ơn, có một số thông tin tốt ở đây.Chỉ trên câu lệnh 'Merge()' - điều này kết hợp một chuỗi các quan sát thành 1 - Trong ứng dụng của tôi, tôi sẽ chỉ sử dụng một trong (serial/udp/tcp) cùng một lúc và cho phép người dùng chuyển đổi giữa các giao diện (do đó tình trạng khó xử của tôi với hệ thống ống nước sự kiện). Điều này có được đề xuất ở đây để hợp nhất các quan sát không? Đánh giá cao liên kết đến các sự kiện nối tiếp Async :) – Simon

+0

@Simon Bạn có thể áp dụng 'Where' cho các quan sát từ các lớp được bọc khi hợp nhất chúng để kiểm tra" CurrentSource "hoặc thuộc tính tương tự để lọc ra các thư không mong muốn. –

+1

@Simon hoặc bạn có thể (tốt hơn) dừng việc sản xuất tin nhắn trên các quan sát khác để 'Hợp nhất' chỉ nhận được một lần tại một thời điểm. Nếu đúng như vậy, có lẽ tốt hơn nên sử dụng 'Chuyển' thay vì' Hợp nhất'. – yamen

0

Một câu trả lời chung cho: các sự kiện để sự kiện phản ứng:

từ iobservables của tôi thường bắt đầu với một sự thay đổi sở hữu:

nếu sự kiện này là một sự thay đổi trong tài sản người ta có thể sử dụng:

IObservable<string> obs= user 
     .ItemPropertyChanged(u=>u.Name,false) 

bằng cách tham khảo các dll từ bài viết mã hóa này: https://www.codeproject.com/Articles/731032/The-best-of-reactive-framework-

(Giả sử là tác giả cũng phải vật lộn để xem các tiện ích của phần mở rộng hoạt)

để đăng ký sự kiện này 'propertychange':

obs.Subscribe((args) => 
     { 
      //Do stuff with args.Sender,args.NewValue and args.OldValue 
     }); 

hoặc nếu chỉ quan tâm đến sự thay đổi sở hữu có một NuGet gói rxx với một phương pháp tương tự, sử dụng như sau:

IObservable<string> obs=Observable2.FromPropertyChangedPattern(() => obj.Name) 

này chứa rất nhiều phương pháp khuyến nông khác


hoặc nếu sự kiện này ngăn cản thay đổi sở hữu/bạn không muốn thực hiện INotifyPropertyChanged

class ObserveEvent_Simple 
{ 
    public static event EventHandler SimpleEvent; 
    static void Main() 
    {   
     IObservable<string> eventAsObservable = Observable.FromEventPattern(
      ev => SimpleEvent += ev, 
      ev => SimpleEvent -= ev); 
    } 
} 

tương tự như u/Gideon Engelberth từ http://rxwiki.wikidot.com/101samples#toc6


hoặc tận dụng điều này bài viết mã hóa dành cho việc chuyển đổi các sự kiện thành các sự kiện phản ứng

https://www.codeproject.com/Tips/1078183/Weak-events-in-NET-using-Reactive-Extensions-Rx

mà cũng giao dịch với đăng ký yếu (Một cái gì đó mà có thể/có thể không là một vấn đề tùy thuộc vào những gì bạn làm với IDisposable từ phương pháp OnSubscribe)

Các vấn đề liên quan