Tôi đang cố gắng để viết một phương pháp mở rộng cho System.Net.WebSocket mà sẽ biến nó thành một IObserver bằng cách sử dụng phần mở rộng phản ứng (Rx.NET). Bạn có thể xem mã bên dưới:Làm thế nào để thực hiện một IObserver với async/await OnNext/OnError/OnCompleted phương pháp?
public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer)
{
// Wrap the web socket in an interface that's a little easier to manage
var webSocketMessageStream = new WebSocketMessageStream(webSocket);
// Create the output stream to the client
return Observer.Create<T>(
onNext: async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)),
onError: async error => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)),
onCompleted: async() => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected")
);
}
Mã này làm việc, nhưng tôi lo ngại về việc sử dụng async/chờ đợi bên trong onNext, onerror, và onCompleted lambdas. Tôi biết rằng điều này trả về một số không đồng bộ async lambda, được cau mày (và đôi khi gây ra các vấn đề mà tôi đã chạy vào bản thân mình).
Tôi đã đọc trên tài liệu Rx.NET cũng như các bài đăng blog trên internet và tôi dường như không tìm được cách thích hợp (nếu có) để sử dụng phương thức async/await trong IObserver. Có cách nào thích hợp để làm việc này không? Nếu không, thì làm thế nào tôi có thể giải quyết vấn đề này?
Giả định của bạn sai. Trên thực tế, bạn đang chuyển 'Func>' (hoặc một số biến thể nhận tham số) thành 'onNext' vv Các đại biểu không có loại trả về' void'. Họ trở lại nhiệm vụ. –
spender
Tôi không chắc chắn tôi sẽ theo bạn. Tôi có thể gán một phương thức không đồng bộ cho một Hành động như thế này: Thử nghiệm hành động = async() => đang chờ DoSomethingAsync(); Việc biên dịch này rất tốt và hành động "thử nghiệm" không thể chờ đợi được. Kiểm tra cuộc gọi() trả về ngay lập tức và trước khi DoSomethingAsync() hoàn thành. Định nghĩa phương thức cho Observer.Create() chấp nhận các hành động như các tham số cho onNext, onError và onCompleted. Đây là lý do tại sao mã tôi đăng ở trên hoạt động. Tuy nhiên, khi ai đó gọi OnNext (x) trên IObserver của tôi, nó sẽ không đợi phần không đồng bộ của phương thức kết thúc trước khi trở về. –
Rất tiếc. Lỗi của tôi. Nhìn vào nó ngay bây giờ. – spender