Đây là trường hợp khá cổ điển khi sử dụng Rx để thăm dò ý kiến hệ thống khác. Hầu hết mọi người sẽ sử dụng Observable.Interval
làm nhà điều hành go-to của họ và hầu hết mọi người sẽ ổn.
Tuy nhiên, bạn có các yêu cầu cụ thể về thời gian chờ và thử lại. Trong trường hợp này, tôi nghĩ rằng bạn là nên sử dụng một sự kết hợp của các nhà khai thác:
Observable.Timer
để cho phép bạn thực hiện truy vấn của bạn trong một thời gian xác định
Timeout
để xác định và truy vấn cơ sở dữ liệu đã tràn ngập
ToObservable()
để ánh xạ các kết quả Task
của bạn đến một chuỗi có thể quan sát được.
Retry
để cho phép bạn khôi phục sau khi hết giờ
Repeat
để cho phép bạn tiếp tục sau khi truy vấn cơ sở dữ liệu thành công. Điều này cũng sẽ giữ khoảng thời gian/khoảng cách ban đầu giữa việc hoàn thành truy vấn cơ sở dữ liệu trước đó và bắt đầu truy vấn tiếp theo.
đoạn này làm việc LINQPad sẽ hiển thị cho bạn những truy vấn hoạt động đúng:
void Main()
{
var pollingPeriod = TimeSpan.FromSeconds(5);
var dbQueryTimeout = TimeSpan.FromSeconds(10);
//You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;
var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });
var query = Observable.Timer(pollingPeriod, scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
query.StartWith("Seed")
.TimeInterval(scheduler) //Just to debug, print the timing gaps.
.Dump();
}
// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
//Oscillate the delay between 3 and 12 seconds
delay += delayModifier;
var timespan = TimeSpan.FromSeconds(delay);
if (delay < 4 || delay > 11)
delayModifier *= -1;
timespan.Dump("delay");
await Task.Delay(timespan);
return "Value";
}
Kết quả như sau:
Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606
Phần chính của mẫu là ....
var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
EDIT: Dưới đây là giải thích thêm về cách đi đến giải pháp này. https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md
Nguồn
2015-11-16 16:04:43
Bạn có thể thêm chi tiết khác không? Truy vấn nào lấy lại dữ liệu? truy vấn trả về một đối tượng đơn lẻ? trong trường hợp hết thời gian chờ bạn nói rằng bạn muốn một truy vấn mới được khởi chạy, truy vấn đó là gì? –