2013-01-22 25 views
5

Tôi đang đấu tranh ở đây. Thông thường tôi đã đọc một cuốn sách nhưng chưa có cuốn sách nào. Tôi đã tìm thấy vô số ví dụ về những thứ khác nhau để làm với việc đọc các luồng bằng cách sử dụng RX nhưng tôi thấy rất khó để có được đầu của tôi xung quanh.Cách thích hợp để tạo một Đài quan sát có thể đọc luồng đến cuối

Tôi biết tôi có thể sử dụng Observable.FromAsyncPattern để tạo trình bao bọc các phương thức BeginRead/EndRead hoặc BeginReadLine/EndReadLine của Stream.

Nhưng điều này chỉ đọc một lần - khi người quan sát đầu tiên đăng ký.

Tôi muốn một Observable sẽ tiếp tục đọc và bơm OnNext cho đến khi lỗi dòng hoặc kết thúc.

Ngoài việc này, tôi cũng muốn biết làm thế nào tôi có thể chia sẻ rằng có thể quan sát với nhiều người đăng ký để tất cả họ đều có được các mục.

+3

Bài viết tuyệt vời nhưng hết sức lỗi thời. Điều đó tham chiếu đến API cũ. Chương này * lấy các khái niệm đó và làm cho nó hoạt động với phiên bản Rx mới hơn. * http: //introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#CreatingYourOwnIterator –

+0

Ooh, không bao giờ biết về liên kết đó @LeeCampbell - rất hay! – JerKimball

+0

Điều gì đã xảy ra với câu hỏi này? Câu trả lời trước đó ở đâu? –

Trả lời

1

Các giải pháp là sử dụng Observable.Create

Dưới đây là một ví dụ có thể được adapated để đọc bất kỳ loại dòng

public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader) 
    { 

     return Observable.Create<Command>(async (subject, token) => 
     { 


      try 
      { 

       while (true) 
       { 

        if (token.IsCancellationRequested) 
        { 
         subject.OnCompleted(); 
         return; 
        } 

        //this part here can be changed to something like this 
        //int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive); 

        Command cmd = await reader.ReadCommandAsync(); 

        subject.OnNext(cmd); 

       } 

      } 

      catch (Exception ex) 
      { 
       try 
       { 
        subject.OnError(ex); 
       } 
       catch (Exception) 
       { 
        Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere"); 
        throw; 
       } 
      } 

     }).Publish(); 

    } 

Đừng quên gọi Connect() trên IConnectableObservable trở

3

Thêm vào câu trả lời của Lee, sử dụng rxx:

using (new FileStream(@"filename.txt", FileMode.Open) 
     .ReadToEndObservable() 
     .Subscribe(x => Console.WriteLine(x.Length))) 
{ 
    Console.ReadKey(); 
} 

Chiều dài của bộ đệm đọc sẽ được xuất ra.

+0

Câu trả lời của Lee ở đâu? –

+0

Biến mất vì một lý do nào đó –

+0

Câu trả lời của anh đã bị xóa vì nó không thực sự cung cấp câu trả lời cho câu hỏi - chỉ cần một liên kết đến một cuốn [01] của anh ấy –

1

Heh - sẽ tái sử dụng một trong những câu trả lời khác của tôi ở đây (tốt, một phần của nó, dù sao):

Ref: Reading from NetworkStream corrupts the buffer

Trong đó, tôi đã có một phương pháp khuyến nông như thế này:

public static class Ext 
{   
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize) 
    {   
     // to hold read data 
     var buffer = new byte[bufferSize]; 
     // Step 1: async signature => observable factory 
     var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
      stream.BeginRead, 
      stream.EndRead); 
     return Observable.While(
      // while there is data to be read 
      () => stream.CanRead, 
      // iteratively invoke the observable factory, which will 
      // "recreate" it such that it will start from the current 
      // stream position - hence "0" for offset 
      Observable.Defer(() => asyncRead(buffer, 0, bufferSize)) 
       .Select(readBytes => buffer.Take(readBytes).ToArray())); 
    } 
} 

Bạn có thể sử dụng điều này như viết bằng một hình thức như vậy:

// Note: ToEnumerable works here because your filestream 
// has a finite length - don't do this with infinite streams! 
var blobboData = stream 
    .ReadObservable(bufferSize) 
    // take while we're still reading data 
    .TakeWhile(returnBuffer => returnBuffer.Length > 0) 
    .ToEnumerable() 
    // mash them all together 
    .SelectMany(buffer => buffer) 
    .ToArray(); 
+0

Xin chào JerKimball, cảm ơn vì giải pháp của bạn. Tôi đã sử dụng nó, sau đó có khó khăn thời gian xử lý các điều kiện lỗi. Làm thế nào để bạn xử lý khi bạn phải đóng kết nối? Tôi đã mở câu hỏi http://stackoverflow.com/questions/32079292/reading-from-stream-using-observable-through-fromasyncpattern-how-to-close-canc về điều đó nếu bạn có thể kiểm tra. –

+0

Rxx dường như làm điều đó rất khác nhau. Bất cứ ai có thể xây dựng? https://github.com/RxDave/Rxx/blob/master/Main/Source/Rxx.Bindings-Net451/System/IO/StreamExtensions.cs –

+0

Có một hình ảnh xác thực với việc triển khai này. Người đăng ký khác nhau sẽ nhận được các phần khác nhau của dữ liệu luồng! Người ta phải nối thêm một '.Publish(). Connect()' để đảm bảo tất cả người đăng ký nhận được tất cả dữ liệu. –

1

Bạn có thể sử dụng Repeat để tiếp tục đọc các dòng cho đến cuối luồng và Publish hoặc Replay để kiểm soát việc chia sẻ trên nhiều người đọc.

Một ví dụ về đơn giản, đầy đủ giải pháp Rx để đọc dòng từ bất kỳ dòng cho đến cuối cùng sẽ là:

public static IObservable<string> ReadLines(Stream stream) 
{ 
    return Observable.Using(
     () => new StreamReader(stream), 
     reader => Observable.FromAsync(reader.ReadLineAsync) 
          .Repeat() 
          .TakeWhile(line => line != null)); 
} 

Giải pháp này cũng lợi dụng thực tế là ReadLine lợi nhuận null khi kết thúc dòng đạt được.

Các vấn đề liên quan