2015-11-14 15 views
8

Tôi phải truy vấn cơ sở dữ liệu một cách kịp thời để biết trạng thái của hệ thống cũ. Tôi đã nghĩ đến việc gói truy vấn xung quanh một Observable, nhưng tôi không biết cách chính xác để làm điều đó.Cơ sở dữ liệu bỏ phiếu với Tiện ích mở rộng phản ứng

Về cơ bản, nó sẽ là cùng một truy vấn sau mỗi 5 giây. Nhưng tôi sợ rằng tôi sẽ phải đối mặt với những vấn đề này:

  • Điều gì xảy ra nếu truy vấn thực hiện mất 10 giây? Tôi không muốn thực hiện bất kỳ truy vấn mới nào nếu truy vấn trước đó vẫn đang được xử lý.
  • Ngoài ra, sẽ có thời gian chờ. Nếu truy vấn hiện tại không thực thi sau, ví dụ: 20 giây, một thông báo có thông tin phải là đã đăng nhập và một lần thử mới (cùng một truy vấn) sẽ được gửi đi.

chi tiết thêm:

  • Truy vấn chỉ là một SELECT trả về một tập dữ liệu với một danh sách các mã trạng thái (làm việc, faulted).
  • Chuỗi quan sát sẽ luôn nhận dữ liệu mới nhất nhận được từ truy vấn, giống như phương pháp tiện ích Chuyển đổi.
  • Tôi muốn quấn truy vấn cơ sở dữ liệu (hoạt động lenghty) vào một nhiệm vụ, nhưng tôi không chắc chắn nếu đó là lựa chọn tốt nhất.

Tôi gần như chắc chắn rằng truy vấn sẽ được thực hiện trong một chuỗi khác, nhưng tôi không biết làm thế nào quan sát được như thế nào, từng đọc Introduction to Rx by Lee Campbell.

+0

Bạn có thể thêm chi tiết khác không? Truy vấn nào lấy lại dữ liệu? truy vấn trả về một đối tượng đơn lẻ? trong trường hợp hết thời gian chờ bạn nói rằng bạn muốn một truy vấn mới được khởi chạy, truy vấn đó là gì? –

Trả lời

14

Đây là trường hợp khá cổ điển khi sử dụng Rx để thăm dò ý kiến ​​hệ thống khác. Hầu hết mọi người sẽ sử dụng Observable.Interval làm nhà điều hành go-to của họ và hầu hết mọi người sẽ ổn.

Tuy nhiên, bạn có các yêu cầu cụ thể về thời gian chờ và thử lại. Trong trường hợp này, tôi nghĩ rằng bạn là nên sử dụng một sự kết hợp của các nhà khai thác:

  • Observable.Timer để cho phép bạn thực hiện truy vấn của bạn trong một thời gian xác định
  • Timeout để xác định và truy vấn cơ sở dữ liệu đã tràn ngập
  • ToObservable() để ánh xạ các kết quả Task của bạn đến một chuỗi có thể quan sát được.
  • Retry để cho phép bạn khôi phục sau khi hết giờ
  • Repeat để cho phép bạn tiếp tục sau khi truy vấn cơ sở dữ liệu thành công. Điều này cũng sẽ giữ khoảng thời gian/khoảng cách ban đầu giữa việc hoàn thành truy vấn cơ sở dữ liệu trước đó và bắt đầu truy vấn tiếp theo.

đoạn này làm việc LINQPad sẽ hiển thị cho bạn những truy vấn hoạt động đúng:

void Main() 
{ 
    var pollingPeriod = TimeSpan.FromSeconds(5); 
    var dbQueryTimeout = TimeSpan.FromSeconds(10); 

    //You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence. 
    var rxQueryTimeOut = pollingPeriod + dbQueryTimeout; 

    var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" }); 

    var query = Observable.Timer(pollingPeriod, scheduler) 
        .SelectMany(_ => DatabaseQuery().ToObservable()) 
        .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler) 
        .Retry() //Loop on errors 
        .Repeat(); //Loop on success 

    query.StartWith("Seed") 
     .TimeInterval(scheduler) //Just to debug, print the timing gaps. 
     .Dump(); 
} 

// Define other methods and classes here 
private static int delay = 9; 
private static int delayModifier = 1; 
public async Task<string> DatabaseQuery() 
{ 
    //Oscillate the delay between 3 and 12 seconds 
    delay += delayModifier; 
    var timespan = TimeSpan.FromSeconds(delay); 
    if (delay < 4 || delay > 11) 
     delayModifier *= -1; 
    timespan.Dump("delay"); 
    await Task.Delay(timespan); 
    return "Value"; 
} 

Kết quả như sau:

Seed 00:00:00.0125407 
Timeout 00:00:15.0166379 
Timeout 00:00:15.0124480 
Timeout 00:00:15.0004520 
Timeout 00:00:15.0013296 
Timeout 00:00:15.0140864 
Value 00:00:14.0251731 
Value 00:00:13.0231958 
Value 00:00:12.0162236 
Value 00:00:11.0138606 

Phần chính của mẫu là ....

var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler) 
       .SelectMany(_ => DatabaseQuery().ToObservable()) 
       .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler) 
       .Retry() //Loop on errors 
       .Repeat(); //Loop on success 

EDIT: Dưới đây là giải thích thêm về cách đi đến giải pháp này. https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md

+0

Mục đích chính của việc sử dụng EventLoopScheduler ở đây để đảm bảo truy vấn chạy trên cùng một chuỗi? Đây có phải là cách tốt nhất để đi khi bỏ phiếu cho một hệ thống khác bằng RX? – jumpercake

+0

Đúng, đó là ý định ở đây. Trong trường hợp này tôi sẽ đề nghị làm như vậy để bạn không phải cạnh tranh với Task/Thread Pool. Khi tôi đặt tên cho Chủ đề trong ví dụ này, hầu hết các sản phẩm khai thác gỗ cũng sẽ phơi bày điều đó. Tuy nhiên, sử dụng EventLoopScheduler ở đây không phải là bắt buộc, Rx sẽ duy trì việc tuần tự hóa công việc. –

+0

Đã cập nhật để bao gồm liên kết đến mã mẫu với các hoạt động để đến lúc triển khai –

1

Tôi nghĩ rằng đây là những gì bạn nên làm:

var query = 
    from n in Observable.Interval(TimeSpan.FromSeconds(5.0)) 
    from ds in Observable.Amb(
     Observable.Start(() => /* Your DataSet query */), 
     Observable 
      .Timer(TimeSpan.FromSeconds(10.0)) 
      .Select(_ => new DataSet("TimeOut"))) 
    select ds; 

này gây nên một truy vấn mới với một khoảng thời gian giữa hành 5 giây. Không phải 5 giây kể từ lần cuối cùng bắt đầu, 5 giây kể từ lần cuối cùng kết thúc.

Sau đó, bạn thử truy vấn của mình, nhưng bạn .Amb với bộ hẹn giờ trả lại số đặc biệt DataSet sau 10 giây. Nếu truy vấn của bạn kết thúc trước 10 giây thì nó sẽ thắng, nhưng nếu không thì trả lại DataSet đặc biệt. Các nhà điều hành .Amb về cơ bản là một nhà điều hành "cuộc đua" - người đầu tiên quan sát được để tạo ra một giá trị thắng.

+0

Wow, sự kết hợp của các quan sát là khá ấn tượng! Bạn có nghĩa là cả hai quan sát sẽ chạy đua và Amb sẽ có người đầu tiên đến. Những gì mất tôi là 2 lồng nhau từ clasues. Có phải đó là phần cho phép phần mà bạn nói "Điều này kích hoạt một truy vấn mới với khoảng thời gian giữa các lần thực hiện 5 giây. Không phải 5 giây kể từ lần cuối cùng bắt đầu, 5 giây kể từ lần cuối cùng kết thúc."? – SuperJMN

+1

@SuperJMN - Cảm ơn. 'Observable.Interval (TimeSpan.FromSeconds (5.0))' chỉ kích hoạt sau 5 giây sau khi tất cả các thuê bao đã hoàn thành công việc của họ. Vì vậy, nếu phần thứ hai của truy vấn đang hoạt động, khoảng thời gian sẽ không kích hoạt cho đến 5 giây sau khi hoàn thành. – Enigmativity

+0

Tôi _think_ khi bạn đang sử dụng 'SelectMany' để tiêu thụ' Observable.Interval' của bạn mà không có gì là _blocking_ mức tiêu thụ, vì vậy nó sẽ đánh dấu mỗi 5 giây, tức là không một khi truy vấn DB đã hoàn tất. –

Các vấn đề liên quan