2012-05-01 22 views
7

Tôi đang cố gắng sử dụng Rx để đọc từ luồng nhận TCPClient và phân tích dữ liệu thành một chuỗi IObservable, được giới hạn bởi dòng mới "\ r \ n" Sau đây là cách tôi nhận được từ socket stream ...Phân tích IO mạng có thể quan sát

var messages = new Subject<string>(); 

var functionReceiveSocketData = 
      Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int> 
      (client.Client.BeginReceive, client.Client.EndReceive); 

Func<byte[], int, byte[]> copy = (bs, n) => 
     { 
      var rs = new byte[buffer.Length]; 
      bs.CopyTo(rs, 0); 
      return rs; 
     }; 

Observable 
    .Defer(() => 
      { 
       var buffer = new byte[50]; 
       return 
        from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None) 
       select copy(buffer, n); 
      }).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x))); 

Đây là những gì tôi nghĩ ra để phân tích chuỗi. Đây là ngày không làm việc ...

obsStrings = messages.Buffer<string,string>(() => 
       messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n")) 
      ); 

Đối tượng nhắn nhận được thông báo trong khối vì vậy tôi đang cố gắng để concat họ và kiểm tra xem chuỗi nối chứa xuống dòng, do đó tín hiệu bộ đệm để đóng và sản lượng đệm miếng, mảnh nhỏ. Không chắc tại sao nó không hoạt động. Có vẻ như tôi chỉ nhận được đoạn đầu tiên của obsStrings.

Vì vậy, tôi đang tìm hai thứ. Tôi muốn đơn giản hóa việc đọc luồng io và loại bỏ việc sử dụng chủ đề thư. Thứ hai, tôi muốn phân tích cú pháp chuỗi của mình. Tôi đã được hack trên này một chút và không thể đưa ra một giải pháp làm việc. Tôi là người mới bắt đầu với Rx.

EDIT: Đây là sản phẩm hoàn chỉnh sau khi vấn đề đã được giải quyết ....

var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None) 
      .SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray()) 
      .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b) 
      .Where(x => x.EndsWith("\r\n")) 
      .Select(buffered => String.Join("", buffered)) 
      .Select(a => a.Replace("\n", "")); 

"ReceiveUntilCompleted" là một phần mở rộng từ dự án RXX.

Trả lời

3
messages 
    .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b) 
    .Where(x => x.EndsWith("\r\n")) 
+0

Tôi thấy rằng tôi không cần .Buffer (1); cuối cùng. – TK3

+0

Xóa nó khỏi câu trả lời. – ronag

1

Thay vì Subscribe và sử dụng Subject, bạn có thể thử chỉ Select:

.Repeat().Select(x => System.Text.Encoding.UTF8.GetString(x));

Bây giờ giả sử này có tất cả đi vào một quan sát mới được gọi là messages, vấn đề tiếp theo của bạn là trong dòng này

var obsStrings = messages.Buffer<string,string>(() => 
       messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n")) 
      ); 

Bạn đang sử dụng cả hai BufferScan và đang cố gắng làm điều tương tự trong cả hai! Lưu ý rằng Buffer cần có bộ chọn đóng.

gì bạn thực sự muốn là:

var obsStrings = messages.Buffer(() => messages.Where(x => x.Contains("\r\n"))) 
         .Select(buffered => String.Join(buffered)); 

Mà cho Buffered một liên quan khi để đóng cửa sổ (khi nó chứa \ r \ n) và cung cấp cho Chọn số lượng đệm để tiếp nhau quan sát được. Điều này dẫn đến một quan sát mới về các chuỗi phân tách của bạn.

Một vấn đề là bạn vẫn có thể có dòng mới ở giữa đoạn và điều này sẽ gây ra sự cố. Một ý tưởng đơn giản là quan sát trên các nhân vật chứ không phải là khối chuỗi đầy đủ, chẳng hạn như:

obsStrings.Repeat().SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray().ToObservable());

Sau đó, bạn có thể làm messages.Where(c => c != '\r') để bỏ qua \r và thay đổi bộ đệm để:

var obsStrings = messages.Buffer(() => messages.Where(x => x == '\n'))) 
         .Select(buffered => String.Join("", buffered)); 
+0

Kiểm tra chỉnh sửa của tôi. Vẫn nhận được đầu ra lạ. – TK3

+0

Hãy suy nghĩ bạn muốn 'messages.Where (x => x == '\ n')' không 'messages.Where (x => x! = '\ N')' trong cuộc gọi 'Buffer'. Nghĩa là, nó sẽ phá vỡ bộ đệm trên mọi dòng mới. – yamen

+0

Vẫn nhận được các phần rời rạc ra khỏi điều này.Tôi tự hỏi nếu đó là một vấn đề luồng mà thử nghiệm đang được thực hiện trên một sợi khác và bộ đệm tiếp tục hoặc phá vỡ sớm để đệm trở nên không xác định. – TK3

Các vấn đề liên quan