2012-09-27 30 views
5

Chúng tôi đang phát triển một dịch vụ WCF để truyền trực tuyến một lượng lớn dữ liệu, do đó chúng tôi đã chọn sử dụng chức năng WCF Streaming kết hợp với chuỗi tuần tự protobuf-net.Nối tiếp đối tượng Lười biếng, luồng với protobuf-net

Bối cảnh:

chung một ý tưởng là để serialize đối tượng trong dịch vụ, hãy viết chúng thành một dòng suối và gửi. Ở đầu kia người gọi sẽ nhận được một đối tượng Stream và nó có thể đọc tất cả dữ liệu.

Vì vậy, hiện nay các phương pháp mã dịch vụ trông hơi như thế này:

public Result TestMethod(Parameter parameter) 
{ 
    // Create response 
    var responseObject = new BusinessResponse { Value = "some very large data"}; 

    // The resposne have to be serialized in advance to intermediate MemoryStream 
    var stream = new MemoryStream(); 
    serializer.Serialize(stream, responseObject); 
    stream.Position = 0; 

    // ResultBody is a stream, Result is a MessageContract 
    return new Result {ResultBody = stream}; 
} 

Đối tượng BusinessResponse là serialized đến một MemoryStream và được trả về từ một phương pháp. Về phía khách hàng mã gọi trông như thế:

var parameter = new Parameter(); 

// Call the service method 
var methodResult = channel.TestMethod(parameter); 

// protobuf-net deserializer reads from a stream received from a service. 
// while reading is performed by protobuf-net, 
// on the service side WCF is actually reading from a 
// memory stream where serialized message is stored 
var result = serializer.Deserialize<BusinessResponse>(methodResult.ResultBody); 
return result; 

Vì vậy, khi serializer.Deserialize() được gọi là nó đọc từ một dòng methodResult.ResultBody, trong cùng thời gian trên WCF bên dịch vụ đang đọc một MemoryStream, đã được trở về từ a TestMethod.

Vấn đề:

gì chúng tôi muốn đạt được là để thoát khỏi một MemoryStream và ban đầu serialization của toàn bộ đối tượng ở phía bên dịch vụ cùng một lúc. Vì chúng tôi sử dụng phát trực tuyến, chúng tôi muốn tránh việc giữ một đối tượng được tuần tự hóa trong bộ nhớ trước khi gửi.

Idea:

Các giải pháp hoàn hảo sẽ trở lại một, đối tượng Suối custom-made rỗng (từ TestMethod()) với một tham chiếu đến một đối tượng mà là để được tuần tự (object 'BusinessResponse' trong ví dụ của tôi). Vì vậy, khi WCF gọi phương thức luồng của tôi, tôi nội bộ tuần tự hóa một phần của đối tượng bằng cách sử dụng protobuf-net và trả lại cho người gọi mà không lưu trữ nó trong bộ nhớ.

Và bây giờ có một vấn đề, bởi vì những gì chúng tôi thực sự cần là khả năng sắp xếp từng phần một đối tượng vào thời điểm luồng được đọc. Tôi hiểu rằng đây là cách hoàn toàn khác nhau của serialization - thay vì đẩy một đối tượng vào serializer, tôi muốn yêu cầu một đoạn nội dung nối tiếp từng mảnh.

Loại serialization đó có thể sử dụng protobuf-net không?

+0

Đây có phải là một đối tượng không? Hoặc một loạt các đối tượng (một bộ sưu tập)? Cho dù nó là giá trị nhìn vào điều này thực sự phụ thuộc vào cấu hình WCF của bạn - trong hầu hết các cấu hình nó sẽ luôn luôn đệm toàn bộ tin nhắn trong bộ nhớ * anyway * - vì vậy nó có thể dễ dàng để không thay đổi bất cứ điều gì. –

+0

Xin chào Marc, WCF được định cấu hình để không sử dụng tính năng đệm - đó là điểm phát trực tiếp - Tôi muốn giảm dung lượng bộ nhớ trên phía máy chủ. Ngoài ra, nếu tôi muốn sắp xếp từng bộ sưu tập các đối tượng, tôi sẽ sử dụng 'SerializeWithLengthPrefix()' mỗi khi Client gọi 'Read()' và bộ đệm cơ bản của tôi nhỏ hơn số lượng dữ liệu yêu cầu. Vấn đề ở đây là tôi muốn có thể phân chia một chuỗi tuần tự hóa đối tượng. –

+0

câu hỏi thú vị. I * nghĩ * điều này có thể được khái quát hóa, về cơ bản là một luồng giả mạo làm cho công việc Đọc và Viết là các đồng tác vụ. Nếu bạn không nhớ có thêm một Thread, nó có thể được thực hiện với một cổng đơn giản, tuy nhiên iirc Jon đã có một số ý tưởng thú vị. Tôi sẽ phải xem xét và lấy lại cho bạn. Tuy nhiên, tôi có thể nói mà không có nghi ngờ rằng tôi không có ý định hack cốt lõi của protobuf-net cho mục đích :) –

Trả lời

2

Tôi đã nấu một số mã có lẽ nằm dọc theo ý tưởng cổng của Marc.

public class PullStream : Stream 
{ 
    private byte[] internalBuffer; 
    private bool ended; 
    private static ManualResetEvent dataAvailable = new ManualResetEvent(false); 
    private static ManualResetEvent dataEmpty = new ManualResetEvent(true); 

    public override bool CanRead 
    { 
     get { return true; } 
    } 

    public override bool CanSeek 
    { 
     get { return false; } 
    } 

    public override bool CanWrite 
    { 
     get { return true; } 
    } 

    public override void Flush() 
    { 
     throw new NotImplementedException(); 
    } 

    public override long Length 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public override long Position 
    { 
     get 
     { 
      throw new NotImplementedException(); 
     } 
     set 
     { 
      throw new NotImplementedException(); 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     dataAvailable.WaitOne(); 
     if (count >= internalBuffer.Length) 
     { 
      var retVal = internalBuffer.Length; 
      Array.Copy(internalBuffer, buffer, retVal); 
      internalBuffer = null; 
      dataAvailable.Reset(); 
      dataEmpty.Set(); 
      return retVal; 
     } 
     else 
     { 
      Array.Copy(internalBuffer, buffer, count); 
      internalBuffer = internalBuffer.Skip(count).ToArray(); // i know 
      return count; 
     } 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     throw new NotImplementedException(); 
    } 

    public override void SetLength(long value) 
    { 
     throw new NotImplementedException(); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     dataEmpty.WaitOne(); 
     dataEmpty.Reset(); 

     internalBuffer = new byte[count]; 
     Array.Copy(buffer, internalBuffer, count); 

     Debug.WriteLine("Writing some data"); 

     dataAvailable.Set(); 
    } 

    public void End() 
    { 
     dataEmpty.WaitOne(); 
     dataEmpty.Reset(); 

     internalBuffer = new byte[0]; 

     Debug.WriteLine("Ending writes"); 

     dataAvailable.Set(); 
    } 
} 

Đây là dòng hậu duệ dòng đơn giản chỉ triển khai Đọc và Viết (và Kết thúc). Các khối Đọc trong khi không có sẵn dữ liệu và các khối Ghi trong khi dữ liệu có sẵn. Bằng cách này, chỉ có một bộ đệm byte có liên quan. Việc sao chép LINQ của phần còn lại được mở để tối ưu hóa ;-) Phương thức End được thêm vào để không bị chặn khi Read được thực hiện khi không có dữ liệu và không có dữ liệu nào được ghi thêm nữa.

Bạn phải ghi vào luồng này từ một chuỗi riêng biệt.Tôi hiển thị điều này bên dưới:

// create a large object 
    var obj = new List<ToSerialize>(); 
    for(int i = 0; i <= 1000; i ++) 
     obj.Add(new ToSerialize { Test = "This is my very loooong message" }); 
    // create my special stream to read from 
    var ms = new PullStream(); 
    new Thread(x => 
    { 
     ProtoBuf.Serializer.Serialize(ms, obj); 
     ms.End(); 
    }).Start(); 
    var buffer = new byte[100]; 
    // stream to write back to (just to show deserialization is working too) 
    var ws = new MemoryStream(); 
    int read; 
    while ((read = ms.Read(buffer, 0, 100)) != 0) 
    { 
     ws.Write(buffer, 0, read); 
     Debug.WriteLine("read some data"); 
    } 
    ws.Position = 0; 
    var back = ProtoBuf.Serializer.Deserialize<List<ToSerialize>>(ws); 

Tôi hy vọng điều này sẽ giải quyết được vấn đề của bạn :-) Thật thú vị khi viết mã này.

Kính trọng, Jacco

+0

Có thể trả về Pullstream (không chắc chắn tên bao gồm những gì đang cố gắng làm) như ResultBody của bạn – Jacco

Các vấn đề liên quan