2014-04-11 11 views
15

Tôi muốn gọi lại một chức năng không đồng bộ trong một thuê bao Rx.Làm thế nào để gọi lại chức năng async từ rx đăng ký?

Ví dụ: như thế:

public class Consumer 
{ 
    private readonly Service _service = new Service(); 

    public ReplaySubject<string> Results = new ReplaySubject<string>(); 

    public void Trigger() 
    { 
     Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync()); 
    } 

    public Task RunAsync() 
    { 
     return _service.DoAsync(); 
    } 
} 

public class Service 
{ 
    public async Task<string> DoAsync() 
    { 
     return await Task.Run(() => Do()); 
    } 

    private static string Do() 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(200)); 
     throw new ArgumentException("invalid!"); 
     return "foobar"; 
    } 
} 

[Test] 
public async Task Test() 
{ 
    var sut = new Consumer(); 
    sut.Trigger(); 
    var result = await sut.Results.FirstAsync(); 
} 

Những gì cần phải được thực hiện, để nắm bắt những ngoại lệ đúng cách?

+1

Tôi chỉ nghĩ rằng tôi có thể đặt async ở nơi đầu tiên. Nhưng tiếc là điều này không giải quyết được vấn đề tôi đang gặp phải. Tôi sẽ đăng một ví dụ rõ ràng hơn. –

+0

có thể trùng lặp của [Có cách nào để đăng ký người quan sát không đồng bộ] (http://stackoverflow.com/questions/18814805/is-there-a-way-to-subscribe-an-observer-as-async) – Zache

Trả lời

9

Bạn không muốn vượt qua một phương pháp async-Subscribe, bởi vì đó sẽ tạo ra một phương pháp async void. Làm hết sức mình để tránh async void.

Trong trường hợp của bạn, tôi nghĩ những gì bạn muốn là gọi phương thức async cho mỗi phần tử của chuỗi và sau đó lưu vào bộ nhớ cache tất cả kết quả. Trong trường hợp đó, sử dụng SelectMany để gọi phương thức async cho mỗi phần tử, và Replay để bộ nhớ cache (cộng với một Connect để có được những cán bóng):

public class Consumer 
{ 
    private readonly Service _service = new Service(); 

    public IObservable<string> Trigger() 
    { 
     var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100)) 
      .SelectMany(_ => RunAsync()) 
      .Replay(); 
     connectible.Connect(); 
     return connectible; 
    } 

    public Task<string> RunAsync() 
    { 
     return _service.DoAsync(); 
    } 
} 

Tôi đã thay đổi Results tài sản để được trả lại từ phương pháp Trigger thay , mà tôi nghĩ rằng ý nghĩa hơn, vì vậy kiểm tra bây giờ trông giống như:

[Test] 
public async Task Test() 
{ 
    var sut = new Consumer(); 
    var results = sut.Trigger(); 
    var result = await results.FirstAsync(); 
} 
+1

Đó là một giải pháp tuyệt vời. Cảm ơn Stephen. –

+0

Điều gì sẽ xảy ra nếu bạn muốn loại trừ các ngoại lệ từ RunAsync? Những gì tôi nhận được là: obs.SelectMany (x => Observable.FromAsync (() => RunAsync()). Catch (Observable.Empty ())) Có cách nào tốt hơn không? –

+0

@VictorGrigoriu: Tôi khuyên bạn nên đặt câu hỏi của riêng bạn và gắn thẻ nó với 'system.reactive'. –

14

Thay đổi này để:

Observable.Timer(TimeSpan.FromMilliseconds(100)) 
    .SelectMany(async _ => await RunAsync()) 
    .Subscribe(); 

Đăng ký không giữ hoạt động không đồng bộ bên trong Observable.

+1

Cảm ơn Paul, vì đề xuất của bạn. Rất thú vị. Bạn có điều gì mà tôi có thể đọc thêm một chút về điều đó không? –

20

Paul Betts' câu trả lời làm việc trong hầu hết các tình huống, nhưng nếu bạn muốn chặn dòng trong khi chờ đợi cho các chức năng async để kết thúc bạn cần một cái gì đó như thế này:

Observable.Interval(TimeSpan.FromSeconds(1)) 
      .Select(l => Observable.FromAsync(asyncMethod)) 
      .Concat() 
      .Subscribe(); 

Hoặc:

Observable.Interval(TimeSpan.FromSeconds(1)) 
      .Select(_ => Observable.Defer(() => asyncMethod().ToObservable())) 
      .Concat() 
      .Subscribe(); 
+1

Đó là một trường hợp sử dụng khác nhưng rất thú vị. –

Các vấn đề liên quan