2016-01-25 21 views
11

Tôi có một bộ đếm thời gian rất đơn giản có thể quan sát được và tôi muốn bắt đầu/dừng truyền mà không ngắt kết nối thuê bao (mà phải ngồi và đợi bất kể trạng thái quan sát được). Có thể, và nếu như vậy làm thế nào?làm thế nào có thể bắt đầu và dừng một khoảng thời gian quan sát được trong RXJS?

var source = Rx.Observable 
    .interval(500) 
    .timeInterval() 
    .map(function (x) { return x.value + ':' + x.interval; }) 
    .take(10); 

    var subscription = source.subscribe(
    function (x) { 
    $("#result").append('Next: ' + x + ' '); 
    }, 
    function (err) { 
    $("#result").append('Error: ' + err); 
    }, 
    function() { 
    $("#result").append('Completed'); 
    }); 

nhận xét chung: hầu hết các ví dụ ive đều cho thấy cách xác định quan sát và người đăng ký. làm thế nào để tôi ảnh hưởng đến hành vi của các đối tượng hiện có?

Trả lời

14

Phụ thuộc vào nguồn tín hiệu dừng/tiếp tục là gì. Cách đơn giản nhất mà tôi có thể nghĩ đến là với pausable operator, vì tài liệu này cho biết hoạt động tốt hơn với các quan sát nóng. Vì vậy, trong đoạn mã mẫu sau, tôi đã xóa take(10) (tín hiệu tạm dừng của bạn hiện đã đi qua chủ đề pauser) và thêm share để biến thiết bị của bạn thành một thiết bị nóng.

var pauser = new Rx.Subject(); 
var source = Rx.Observable 
    .interval(500) 
    .timeInterval() 
    .map(function (x) { return x.value + ':' + x.interval; }) 
    .share() 
    .pausable(pauser); 

var subscription = source.subscribe(
    function (x) { 
    $("#result").append('Next: ' + x + ' '); 
    }, 
    function (err) { 
    $("#result").append('Error: ' + err); 
    }, 
    function() { 
    $("#result").append('Completed'); 
}); 

    // To begin the flow 
pauser.onNext(true); // or source.resume(); 

// To pause the flow at any point 
pauser.onNext(false); // or source.pause(); 

Đây là một more sophisticated example đó sẽ tạm dừng nguồn của bạn mỗi 10 hạng mục:

// Helper functions 
function emits (who, who_) {return function (x) { 
who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n"); 
};} 

var pauser = new Rx.Subject(); 
var source = Rx.Observable 
    .interval(500) 
    .timeInterval() 
    .map(function (x) { return x.value + ':' + x.interval; }) 
    .share(); 
var pausableSource = source 
    .pausable(pauser); 

source 
    .scan(function (acc, _){return acc+1}, 0) 
    .map(function(counter){return !!(parseInt(counter/10) % 2)}) 
    .do(emits(ta_validation, 'scan')) 
    .subscribe(pauser); 

var subscription = pausableSource.subscribe(
    function (x) { 
    $("#ta_result").append('Next: ' + x + ' '); 
    }, 
    function (err) { 
    $("#ta_result").append('Error: ' + err); 
    }, 
    function() { 
    $("#ta_result").append('Completed'); 
}); 

Bạn sẽ trông như thế câu trả lời của bạn cho câu hỏi thứ hai. Kết hợp các quan sát bạn được cung cấp với các nhà khai thác RxJS có liên quan để nhận ra trường hợp sử dụng của bạn. Đây là những gì tôi đã làm ở đây.

+1

cảm ơn câu trả lời của bạn. ive thêm một nút để tạm dừng/chơi và tôi nhận thấy rằng các dòng được đặt lại trên mỗi sơ yếu lý lịch để nó không làm chính xác những gì tôi cần mà là một đóng băng/tiếp tục mà không cần khởi động lại. –

+0

Bạn có biết sự khác biệt giữa nóng và lạnh có thể quan sát được không. Có một cái nhìn ở đây: http://stackoverflow.com/questions/32190445/hot-and-cold-observables-are-there-hot-and-cold-operators/34669444#34669444. Nếu luồng của bạn đang đặt lại, điều đó có thể có nghĩa là bạn đang sử dụng một quan sát lạnh có nghĩa là có thể quan sát được 'đặt lại' mỗi lần có một người đăng ký mới. Cách duy nhất để đảm bảo là xem mã của bạn. Trong ví dụ của tôi, tôi đã thêm một '.share()' để đảm bảo luồng đã nóng trước khi tôi chuyển nó cho toán tử 'pauseable'. Nếu không, tôi sẽ có cùng một vấn đề mà bạn có bây giờ. – user3743222

Các vấn đề liên quan