2015-09-07 16 views
5

Tôi có một yêu cầu viết chủ đề cho NetworkStream. Một chuỗi khác đang đọc câu trả lời từ luồng này.C# Làm thế nào để phối hợp các chủ đề đọc và viết khi kết nối lại NetworkStream?

Tôi muốn làm cho nó có khả năng chịu lỗi. Trong trường hợp thất bại mạng, tôi muốn NetworkStream được xử lý thay thế bằng một thương hiệu mới.

Tôi đã tạo hai luồng xử lý các ngoại lệ IO/Socket. Mỗi người trong số họ sẽ cố gắng thiết lập lại kết nối. Tôi đang đấu tranh để phối hợp hai chủ đề này. Tôi đã phải đặt phần khóa làm cho mã khá phức tạp và dễ bị lỗi.

Có cách nào được khuyến nghị để triển khai tính năng này không? Có lẽ sử dụng một chủ đề duy nhất nhưng làm cho Đọc hoặc Viết không đồng bộ?

Trả lời

1

Tôi tìm thấy nó dễ nhất để có một chuỗi duy nhất xử lý sự phối hợp và viết và sau đó xử lý việc đọc bằng cách sử dụng async. Bằng cách bao gồm CancellationTokenSource trong đối tượng AsyncState, mã trình đọc có thể báo hiệu chuỗi gửi để khởi động lại kết nối, khi người nhận gặp lỗi (gọi EndRead hoặc nếu luồng hoàn tất). Sự phối hợp/nhà văn nằm trong một vòng lặp tạo ra kết nối và sau đó các vòng lặp tiêu tốn một mục số BlockingCollection<T> để gửi. Bằng cách sử dụng BlockingCollection.GetConsumingEnumerable(token), người gửi có thể bị hủy khi người đọc gặp lỗi.

private class AsyncState 
    { 
     public byte[] Buffer { get; set; } 
     public NetworkStream NetworkStream { get; set; } 
     public CancellationTokenSource CancellationTokenSource { get; set; } 
    } 

Sau khi bạn đã tạo kết nối, bạn có thể bắt đầu quá trình đọc không đồng bộ (giữ tự gọi khi nào mọi thứ đang hoạt động). Đi qua bộ đệm, suối và CancellationTokenSource trong đối tượng trạng thái:

var buffer = new byte[1]; 
stream.BeginRead(buffer, 0, 1, Callback, 
    new AsyncState 
    { 
     Buffer = buffer, 
     NetworkStream = stream, 
     CancellationTokenSource = cts2 
    }); 

Sau đó bạn bắt đầu đọc từ hàng đợi đầu ra của bạn và bằng văn bản cho dòng cho đến khi bị hủy bỏ hoặc một thất bại sẽ xảy ra:

using (var writer = new StreamWriter(stream, Encoding.ASCII, 80, true)) 
{ 
    foreach (var item in this.sendQueue.GetConsumingEnumerable(cancellationToken)) 
    { 
     ... 

. .. và trong Callback, bạn có thể kiểm tra lỗi và, nếu cần, nhấn nút CancellationTokenSource để báo hiệu chuỗi ghi để khởi động lại kết nối.

private void Callback(IAsyncResult ar) 
{ 
    var state = (AsyncState)ar.AsyncState; 

    if (ar.IsCompleted) 
    { 
    try 
    { 
     int bytesRead = state.NetworkStream.EndRead(ar); 
     LogState("Post read ", state.NetworkStream); 
    } 
    catch (Exception ex) 
    { 
     Log.Warn("Exception during EndRead", ex); 
     state.CancellationTokenSource.Cancel(); 
     return; 
    } 

    // Deal with the character received 
    char c = (char)state.Buffer[0]; 

    if (c < 0) 
    { 
     Log.Warn("c < 0, stream closing"); 
     state.CancellationTokenSource.Cancel(); 
     return; 
    } 

    ... deal with the character here, building up a buffer and 
    ... handing it out to the application when completed 
    ... perhaps using Rx Subject<T> to make it easy to subscribe 

    ... and finally ask for the next byte with the same Callback 

    // Launch the next reader 
    var buffer2 = new byte[1]; 
    var state2 = state.WithNewBuffer(buffer2); 
    state.NetworkStream.BeginRead(buffer2, 0, 1, Callback, state2); 
+0

Đó là dòng tôi muốn khám phá. Tôi không thể hình dung mã cho trường hợp sử dụng của tôi mặc dù. Tôi cần phải đọc hai tiêu đề byte có chứa chiều dài của cơ thể và sau đó đọc cơ thể. Làm thế nào để làm điều đó? Tôi mặc dù có ReadHeader mà gọi BeingRead với ReadBody nhưng âm thanh như tôi cần phải đọc phần còn lại của cơ thể đồng bộ trong phương pháp ReadBody. Ngoài ra, tôi nên sử dụng CancellationTokenSource như thế nào? – Gatis

+0

Tôi đã thêm mã để hiển thị cách xử lý đọc một byte tại một thời điểm và cách mã thông báo hủy được chuyển vào và được sử dụng để báo hiệu chuỗi ghi khi xảy ra lỗi đọc. Bạn có thể bắt đầu bằng cách đọc hai byte để đọc async ban đầu và sau đó đọc phần còn lại trong một cuộc gọi. –

+0

Tuyệt vời. Tôi có thực sự cần state2 không? Tại sao tôi không thể tái sử dụng nhà nước? – Gatis

Các vấn đề liên quan