2012-04-12 24 views
8

Là một phần của ứng dụng của chúng tôi (trong sản xuất cho khoảng 4 tháng nay), chúng tôi có một dòng dữ liệu đến từ một thiết bị bên ngoài mà chúng ta chuyển đổi sang một IObservablephương pháp ưa thích để tạo ra một IObservable <String> từ một Stream

Cho đến bây giờ chúng tôi đã sử dụng những điều sau đây để tạo ra nó và nó hoạt động khá tốt.

IObservable<string> ObserveStringStream(Stream inputStream) 
{ 
    var streamReader = new StreamReader(inputStream); 
    return Observable 
      .Create<string>(observer => Scheduler.ThreadPool 
      .Schedule(() => ReadLoop(streamReader, observer))); 
} 

private void ReadLoop(StreamReader reader, IObserver<string> observer) 
{ 
    while (true) 
    { 
     try 
     { 
      var line = reader.ReadLine(); 
      if (line != null) 
      { 
       observer.OnNext(line); 
      } 
      else 
      { 
       observer.OnCompleted(); 
       break; 
      } 
     } 
     catch (Exception ex) 
     { 
      observer.OnError(ex); 
      break; 
     } 
    } 
} 

Đêm qua tôi tự hỏi nếu có một cách để sử dụng cú pháp yield return để đạt được kết quả tương tự và đến với điều này:

IObservable<string> ObserveStringStream(Stream inputStream) 
{ 
    var streamReader = new StreamReader(inputStream); 
    return ReadLoop(streamReader) 
      .ToObservable(Scheduler.ThreadPool); 
} 

private IEnumerable<string> ReadLoop(StreamReader reader) 
{ 
    while (true) 
    { 
     var line = reader.ReadLine(); 
     if (line != null) 
     { 
      yield return line; 
     } 
     else 
     { 
      yield break; 
     } 
    } 
} 

Dường như với hoạt động khá tốt và nó sạch hơn nhiều, nhưng tôi đã tự hỏi liệu có bất kỳ ưu điểm hay khuyết điểm nào trên con đường này hay không, hoặc nếu có một cách tốt hơn hoàn toàn.

+2

Pro: 'năng suất return' hỗ trợ lười biếng/trễ tải của bộ sưu tập của bạn. –

+2

Con: khi một ngoại lệ được ném, nó không gọi OnException, nó chỉ bong bóng lên –

+0

Tôi đoán nó phụ thuộc nếu bạn không nhớ đốt một chuỗi để đọc vòng lặp của bạn, mà đi xuống bao nhiêu thiết bị bạn cần hỗ trợ. Tôi đã viết một AsyncTextReader đó là chính nó quan sát để làm một cái gì đó tương tự, nhưng ở quy mô. Chắc chắn những ngày này bạn có thể AWAIT một cái gì đó ... – piers7

Trả lời

11

Tôi nghĩ rằng bạn đã có một ý tưởng hay ở đó (bật Luồng thành Có thể đếm được rồi IObservable). Tuy nhiên, mã Enumberable có thể được nhiều bụi:

IEnumerable<string> ReadLines(Stream stream) 
{ 
    using (StreamReader reader = new StreamReader(stream)) 
    { 
     while (!reader.EndOfStream) 
      yield return reader.ReadLine(); 
    } 
} 

Và sau đó Quan sát:

IObservable<string> ObserveLines(Stream inputStream) 
{ 
    return ReadLines(inputStream).ToObservable(Scheduler.ThreadPool); 
} 

Đây là ngắn hơn, dễ đọc hơn, và đúng disposes của suối. Nó cũng lười biếng.

Tiện ích mở rộng ToObservable chịu trách nhiệm bắt các sự kiện OnNext (dòng mới) cũng như sự kiện OnCompleted (kết thúc đếm) và OnError.

+0

Đẹp, rất sạch sẽ. Tôi sẽ phải thử vào ngày mai. Mối quan tâm duy nhất của tôi là tôi có thể nhận được một dấu null như là mục cuối cùng trong Observable, nhưng đó là dễ dàng để lọc ra với. Where (line => line! = Null) – baralong

2

Tôi không có mã để xử lý, nhưng dưới đây là cách thực hiện async CTP không đồng bộ trước.

[Ghi chú cho skim-reader: không cần phải bận tâm nếu bạn không cần phải mở rộng nhiều]

Tạo một thực hiện AsyncTextReader, đó là tự quan sát được. Các ctor mất trong một Stream, và thực hiện một BeginRead (256byte) trên dòng, đi qua chính nó như là sự tiếp tục, sau đó trở về.

Khi tiếp tục được nhập, hãy gọi EndRead và thêm byte được trả về vào một bộ đệm nhỏ trên lớp. Lặp lại điều này cho đến khi bộ đệm chứa một hoặc nhiều chuỗi cuối dòng (theo TextWriter). Khi điều này xảy ra, hãy gửi (các) bit của bộ đệm ra dưới dạng chuỗi thông qua giao diện Quan sát và lặp lại.

Khi bạn hoàn tất, hãy báo hiệu OnComplete, v.v. (và xử lý luồng). Nếu bạn nhận được một ngoại lệ được ném từ EndReadByte trong sự tiếp tục của bạn, hãy nắm bắt nó và truyền nó ra giao diện OnError.

mã gọi sau đó trông giống như:

IObservable = new AsyncTextReader (stream);

Điều này cân tốt. Chỉ cần chắc chắn rằng bạn không làm bất cứ điều gì quá câm với việc xử lý bộ đệm.

mã giả:

public ctor(Stream stream){ 
    this._stream = stream; 
    BeginRead(); 
    return; 
} 

private void BeginRead(){ 
    // kick of async read and return (synchronously) 
    this._stream.BeginRead(_buffer,0,256,EndRead,this); 
} 

private void EndRead(IAsyncResult result){ 
    try{ 
     // bytesRead will be *up to* 256 
     var bytesRead = this._stream.EndRead(result); 
     if(bytesRead < 1){ 
      OnCompleted(); 
      return; 
     } 
     // do work with _buffer, _listOfBuffers 
     // to get lines out etc... 
     OnNext(aLineIFound); // times n 
     BeginRead(); // go round again 
    }catch(Exception err){ 
     OnException(err); 
    } 
} 

Ok, đây là APM, và một cái gì đó chỉ là một người mẹ có thể yêu. Tôi rất mong chờ sự thay thế.

ps: người đọc có nên đóng luồng không là một câu hỏi thú vị. Tôi nói không, bởi vì nó không tạo ra nó.

0

Với async/chờ đợi hỗ trợ, sau đây rất có thể là lựa chọn tốt nhất của bạn:

IObservable<string> ObserveStringStream(Stream inputStream) 
{ 
    return Observable.Using(() => new StreamReader(inputStream), 
     sr => Observable.Create<string>(async (obs, ct) => 
     { 
      while (true) 
      { 
       ct.ThrowIfCancellationRequested(); 
       var line = await sr.ReadLineAsync().ConfigureAwait(false); 
       if (line == null) 
        break; 
       obs.OnNext(line); 
      } 
      obs.OnCompleted(); 
    })); 
} 
Các vấn đề liên quan