2013-07-04 45 views
7

Tôi có một phương pháp async đó là một phương pháp lâu dài mà đọc một dòng suối và khi nó tìm thấy một cái gì đó cháy một sự kiện:Chuyển đổi một phương pháp async trở IObservable <>

public static async void GetStream(int id, CancellationToken token) 

Phải mất một mã thông báo hủy bỏ vì nó được tạo ra trong một nhiệm vụ mới. Bên trong nó gọi await khi nó đọc một dòng:

var result = await sr.ReadLineAsync() 

Bây giờ, tôi muốn chuyển đổi này cho một phương thức trả về một IObservable <> vì vậy mà tôi có thể sử dụng với các phần mở rộng phản ứng. Từ những gì tôi đã đọc, cách tốt nhất để làm điều này là sử dụng Observable.Create, và kể từ RX 2.0 bây giờ cũng hỗ trợ async tôi có thể nhận tất cả để làm việc với một cái gì đó như thế này:

public static IObservable<Message> ObservableStream(int id, CancellationToken token) 
{ 
    return Observable.Create<Message>(
     async (IObserver<Message> observer) => 
      { 

Phần còn lại của mã bên trong là như nhau, nhưng thay vì bắn các sự kiện, tôi gọi số observer.OnNext(). Nhưng, điều này cảm thấy sai. Đối với một điều tôi trộn CancellationTokens lên trong đó, và mặc dù thêm từ khóa async làm cho nó hoạt động, đây thực sự là điều tốt nhất để làm gì? Tôi gọi ObservableStream của tôi như thế này:

Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m)); 
+1

Bạn nên hầu như không bao giờ sử dụng 'async void', chắc chắn không phải trong các phương pháp thư viện. – svick

Trả lời

1

Bạn nên thay đổi GetStream để trả về một nhiệm vụ, thay vì void (trở về async khoảng trống là không tốt, trừ khi thật cần thiết, như svick nhận xét). Một khi bạn trở lại một nhiệm vụ, bạn chỉ có thể gọi. ToObservable() và bạn đã hoàn tất.

Ví dụ:

public static async Task<int> GetStream(int id, CancellationToken token) { ... } 

Sau đó,

GetStream(1, new CancellationToken(false)) 
    .ToObservable() 
    .Subscribe(Console.Write); 
+0

Tôi nghĩ 'GetStream()' kích hoạt một sự kiện vài lần, vì vậy 'Task ' sẽ không đủ. – svick

+0

Vâng, chữ ký là một ví dụ, nhưng trong bản chỉnh sửa, tôi ngụ ý loại bỏ GetStream bằng cách sử dụng trực tiếp ToObservable(). Tôi đang cập nhật câu trả lời của mình với một số giải thích rõ ràng. –

+0

Ah đúng, cảm ơn. Có, GetStream chạy bao giờ và kích hoạt một sự kiện khi nó tìm thấy một tin nhắn. Vì vậy, nếu tôi sử dụng 'Task ' thay vào đó, điều đó có hợp lý không? Tôi sẽ bỏ sự kiện bắn vào đó vì thứ gì đó khác? –

11

Bạn là chính xác. Khi bạn đại diện cho giao diện của mình thông qua một số IObservable, bạn nên tránh yêu cầu người gọi cung cấp CancellationToken. Điều đó không có nghĩa là bạn không thể sử dụng chúng trong nội bộ. Rx cung cấp một số cơ chế để tạo ra các trường hợp CancellationToken bị hủy khi người quan sát hủy đăng ký khỏi quan sát của bạn.

Có một số cách để giải quyết vấn đề của bạn. Đơn giản nhất yêu cầu hầu như không có thay đổi nào trong mã của bạn. Nó sử dụng một quá tải của Observable.Create mà cung cấp cho bạn một CancellationToken kích hoạt nếu việc hủy đăng ký gọi:

public static IObservable<Message> ObservableStream(int id) 
{ 
    return Observable.Create<Message>(async (observer, token) => 
    { 
     // no exception handling required. If this method throws, 
     // Rx will catch it and call observer.OnError() for us. 
     using (var stream = /*...open your stream...*/) 
     { 
      string msg; 
      while ((msg = await stream.ReadLineAsync()) != null) 
      { 
       if (token.IsCancellationRequested) { return; } 
       observer.OnNext(msg); 
      } 
      observer.OnCompleted(); 
     } 
    }); 
} 
+0

Không nên 'if (token.IsCancellationRequested) {return; } 'be' if (token.IsCancellationRequested) {break; } 'để' OnCompleted' được gọi? – TiMoch

+0

@TiMoch không có người quan sát nào đã hủy đăng ký trong trường hợp này, không cần phải gửi cho họ bất kỳ thông báo bổ sung nào – Brandon

Các vấn đề liên quan