Tôi tìm thấy nó dễ nhất để có một chuỗi duy nhất xử lý sự phối hợp và viết và sau đó xử lý việc đọc bằng cách sử dụng async. Bằng cách bao gồm CancellationTokenSource
trong đối tượng AsyncState
, mã trình đọc có thể báo hiệu chuỗi gửi để khởi động lại kết nối, khi người nhận gặp lỗi (gọi EndRead
hoặc nếu luồng hoàn tất). Sự phối hợp/nhà văn nằm trong một vòng lặp tạo ra kết nối và sau đó các vòng lặp tiêu tốn một mục số BlockingCollection<T>
để gửi. Bằng cách sử dụng BlockingCollection.GetConsumingEnumerable(token)
, người gửi có thể bị hủy khi người đọc gặp lỗi.
private class AsyncState
{
public byte[] Buffer { get; set; }
public NetworkStream NetworkStream { get; set; }
public CancellationTokenSource CancellationTokenSource { get; set; }
}
Sau khi bạn đã tạo kết nối, bạn có thể bắt đầu quá trình đọc không đồng bộ (giữ tự gọi khi nào mọi thứ đang hoạt động). Đi qua bộ đệm, suối và CancellationTokenSource
trong đối tượng trạng thái:
var buffer = new byte[1];
stream.BeginRead(buffer, 0, 1, Callback,
new AsyncState
{
Buffer = buffer,
NetworkStream = stream,
CancellationTokenSource = cts2
});
Sau đó bạn bắt đầu đọc từ hàng đợi đầu ra của bạn và bằng văn bản cho dòng cho đến khi bị hủy bỏ hoặc một thất bại sẽ xảy ra:
using (var writer = new StreamWriter(stream, Encoding.ASCII, 80, true))
{
foreach (var item in this.sendQueue.GetConsumingEnumerable(cancellationToken))
{
...
. .. và trong Callback, bạn có thể kiểm tra lỗi và, nếu cần, nhấn nút CancellationTokenSource để báo hiệu chuỗi ghi để khởi động lại kết nối.
private void Callback(IAsyncResult ar)
{
var state = (AsyncState)ar.AsyncState;
if (ar.IsCompleted)
{
try
{
int bytesRead = state.NetworkStream.EndRead(ar);
LogState("Post read ", state.NetworkStream);
}
catch (Exception ex)
{
Log.Warn("Exception during EndRead", ex);
state.CancellationTokenSource.Cancel();
return;
}
// Deal with the character received
char c = (char)state.Buffer[0];
if (c < 0)
{
Log.Warn("c < 0, stream closing");
state.CancellationTokenSource.Cancel();
return;
}
... deal with the character here, building up a buffer and
... handing it out to the application when completed
... perhaps using Rx Subject<T> to make it easy to subscribe
... and finally ask for the next byte with the same Callback
// Launch the next reader
var buffer2 = new byte[1];
var state2 = state.WithNewBuffer(buffer2);
state.NetworkStream.BeginRead(buffer2, 0, 1, Callback, state2);
Đó là dòng tôi muốn khám phá. Tôi không thể hình dung mã cho trường hợp sử dụng của tôi mặc dù. Tôi cần phải đọc hai tiêu đề byte có chứa chiều dài của cơ thể và sau đó đọc cơ thể. Làm thế nào để làm điều đó? Tôi mặc dù có ReadHeader mà gọi BeingRead với ReadBody nhưng âm thanh như tôi cần phải đọc phần còn lại của cơ thể đồng bộ trong phương pháp ReadBody. Ngoài ra, tôi nên sử dụng CancellationTokenSource như thế nào? – Gatis
Tôi đã thêm mã để hiển thị cách xử lý đọc một byte tại một thời điểm và cách mã thông báo hủy được chuyển vào và được sử dụng để báo hiệu chuỗi ghi khi xảy ra lỗi đọc. Bạn có thể bắt đầu bằng cách đọc hai byte để đọc async ban đầu và sau đó đọc phần còn lại trong một cuộc gọi. –
Tuyệt vời. Tôi có thực sự cần state2 không? Tại sao tôi không thể tái sử dụng nhà nước? – Gatis