2016-02-03 29 views
11

Tôi đang cố gắng để viết một phương pháp mở rộng cho System.Net.WebSocket mà sẽ biến nó thành một IObserver bằng cách sử dụng phần mở rộng phản ứng (Rx.NET). Bạn có thể xem mã bên dưới:Làm thế nào để thực hiện một IObserver với async/await OnNext/OnError/OnCompleted phương pháp?

public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer) 
{ 
    // Wrap the web socket in an interface that's a little easier to manage 
    var webSocketMessageStream = new WebSocketMessageStream(webSocket); 

    // Create the output stream to the client 
    return Observer.Create<T>(
     onNext:  async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)), 
     onError:  async error => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)), 
     onCompleted: async()  => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected") 
    ); 
} 

Mã này làm việc, nhưng tôi lo ngại về việc sử dụng async/chờ đợi bên trong onNext, onerror, và onCompleted lambdas. Tôi biết rằng điều này trả về một số không đồng bộ async lambda, được cau mày (và đôi khi gây ra các vấn đề mà tôi đã chạy vào bản thân mình).

Tôi đã đọc trên tài liệu Rx.NET cũng như các bài đăng blog trên internet và tôi dường như không tìm được cách thích hợp (nếu có) để sử dụng phương thức async/await trong IObserver. Có cách nào thích hợp để làm việc này không? Nếu không, thì làm thế nào tôi có thể giải quyết vấn đề này?

+0

Giả định của bạn sai. Trên thực tế, bạn đang chuyển 'Func >' (hoặc một số biến thể nhận tham số) thành 'onNext' vv Các đại biểu không có loại trả về' void'. Họ trở lại nhiệm vụ. – spender

+0

Tôi không chắc chắn tôi sẽ theo bạn. Tôi có thể gán một phương thức không đồng bộ cho một Hành động như thế này: Thử nghiệm hành động = async() => đang chờ DoSomethingAsync(); Việc biên dịch này rất tốt và hành động "thử nghiệm" không thể chờ đợi được. Kiểm tra cuộc gọi() trả về ngay lập tức và trước khi DoSomethingAsync() hoàn thành. Định nghĩa phương thức cho Observer.Create() chấp nhận các hành động như các tham số cho onNext, onError và onCompleted. Đây là lý do tại sao mã tôi đăng ở trên hoạt động. Tuy nhiên, khi ai đó gọi OnNext (x) trên IObserver của tôi, nó sẽ không đợi phần không đồng bộ của phương thức kết thúc trước khi trở về. –

+0

Rất tiếc. Lỗi của tôi. Nhìn vào nó ngay bây giờ. – spender

Trả lời

12

Đăng ký không thực hiện các phương thức async. Vì vậy, những gì xảy ra ở đây là bạn đang sử dụng một cơ chế lửa và quên từ async void. Vấn đề là các tin nhắn onNext sẽ không còn được đăng.

Thay vì gọi phương thức async bên trong Đăng ký, bạn nên bọc nó vào đường ống để cho phép Rx đợi.

Được phép sử dụng lửa và quên trên onErroronCompleted vì chúng được đảm bảo là điều cuối cùng được gọi là từ Observable. Hãy nhớ rằng các tài nguyên liên quan đến Observable có thể được dọn sạch sau khi onErroronCompleted được trả về, trước khi chúng hoàn tất.

Tôi đã viết một phương pháp khuyến nông nhỏ mà làm tất cả điều này:

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted) 
    { 
     return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat() 
      .Subscribe(
      e => { }, // empty 
      onError, 
      onCompleted); 
    } 

Đầu tiên chúng tôi chuyển đổi phương pháp async onNext thành một Observable, cầu nối hai thế giới async. Điều này có nghĩa là không đồng bộ/chờ đợi bên trong onNext sẽ được tôn trọng. Tiếp theo chúng ta bọc phương thức này vào Defer để nó không bắt đầu ngay khi được tạo. Thay vào đó, Concat sẽ gọi lệnh tiếp theo bị trì hoãn chỉ khi kết thúc trước đó.

Ps. Tôi hy vọng bản phát hành tiếp theo của Rx.Net sẽ có hỗ trợ async khi đăng ký.

+0

Ý của bạn là gì bởi "tin nhắn onNext sẽ không còn được tuần tự hóa"? –

+1

Điều Dorus nói là hai lớp không đồng bộ đang được giới thiệu ở đây. Nếu nhận được một tin nhắn, nó sẽ gọi phương thức 'WriteMessageAsync'. Nếu nhận được một tin nhắn khác, thì nó sẽ thực thi 'WriteMessageAsync' bất kể cuộc gọi trước đó đã hoàn thành hay chưa. Điều này có nghĩa là bạn có thể có các thông điệp đặt hàng (đặc biệt nếu việc lên lịch TaskPool cũng được tham gia). tức là các tin nhắn văn bản sẽ không còn được sắp xếp theo thứ tự. –

+0

@Dorus, Cảm ơn bạn đã phản hồi. Bạn có nhớ chỉnh sửa bài đăng của mình để giải thích phương pháp mở rộng của bạn hoạt động chi tiết hơn một chút không? Một điều tôi tự hỏi là nếu điều này thực sự sẽ tận dụng lợi thế của async/await hoặc nó sẽ chặn một thread trong khi chờ đợi OnNext để hoàn thành? Ngoài ra, tôi nhận thấy rằng Rx.NET có phiên bản beta của v2.3 trên NuGet, nhưng tôi dường như không thể tìm thấy bất kỳ ghi chú phát hành nào cho họ. Bạn có biết họ có thể ở đâu không? –

Các vấn đề liên quan