2017-02-12 21 views
8

Tôi không thể tìm ra cách hoạt động của publishReplay().refCount().rxjs 5 publishReplay refCount

Ví dụ (https://jsfiddle.net/7o3a45L1/):

var source = Rx.Observable.create(observer => { 
    console.log("call"); 
    // expensive http request 
    observer.next(5); 
}).publishReplay().refCount(); 

subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)}); 
subscription1.unsubscribe(); 
console.log(""); 

subscription2 = source.subscribe({next: (v) => console.log('observerB: ' + v)}); 
subscription2.unsubscribe(); 
console.log(""); 

subscription3 = source.subscribe({next: (v) => console.log('observerC: ' + v)}); 
subscription3.unsubscribe(); 
console.log(""); 

subscription4 = source.subscribe({next: (v) => console.log('observerD: ' + v)}); 
subscription4.unsubscribe(); 

cho kết quả sau:

gọi observerA: 5

observerB: 5 cuộc gọi observerB: 5

observerC: 5 người quan sátC: 5 người quan sát cuộc gọiC: 5

observerD: 5 observerD: 5 observerD: 5 cuộc gọi observerD: 5

1) Tại sao observerB, C và D gọi là nhiều lần?

2) Tại sao "cuộc gọi" được in trên mỗi dòng chứ không phải ở đầu dòng?

Ngoài ra, nếu tôi gọi publishReplay(1).refCount(), nó gọi người quan sátB, C và D 2 lần mỗi.

Điều tôi mong đợi là mọi người quan sát mới nhận giá trị 5 chính xác một lần và "cuộc gọi" chỉ được in một lần.

Trả lời

13

publishReplay(x).refCount() kết hợp nào sau đây:

  • Nó tạo ra một ReplaySubject mà phát lại lên đến x khí thải. Nếu x không được xác định thì nó sẽ phát lại toàn bộ luồng.
  • Điều này làm cho ReplaySubject tương thích đa hướng này sử dụng toán tử refCount(). Điều này dẫn đến đồng thời đăng ký nhận cùng mức phát thải.

Ví dụ của bạn chứa một vài vấn đề liên quan đến cách hoạt động của nó. Xem đoạn điều chỉnh sau:

var state = 5 
 
var realSource = Rx.Observable.create(observer => { 
 
    console.log("creating expensive HTTP-based emission"); 
 
    observer.next(state++); 
 
// observer.complete(); 
 
    
 
    return() => { 
 
    console.log('unsubscribing from source') 
 
    } 
 
}); 
 

 

 
var source = Rx.Observable.of('') 
 
    .do(() => console.log('stream subscribed')) 
 
    .ignoreElements() 
 
    .concat(realSource) 
 
.do(null, null,() => console.log('stream completed')) 
 
.publishReplay() 
 
.refCount() 
 
; 
 
    
 
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)}); 
 
subscription1.unsubscribe(); 
 
    
 
subscription2 = source.subscribe(v => console.log('observerB: ' + v)); 
 
subscription2.unsubscribe(); 
 
    
 
subscription3 = source.subscribe(v => console.log('observerC: ' + v)); 
 
subscription3.unsubscribe(); 
 
    
 
subscription4 = source.subscribe(v => console.log('observerD: ' + v)); 
 
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

Khi chạy đoạn này chúng ta có thể thấy rõ rằng nó không được phát ra các giá trị trùng lặp cho Observer D, nó thực chất là tạo ra khí thải mới cho mỗi thuê bao . Làm thế nào mà?

Mọi đăng ký được hủy đăng ký trước khi đăng ký tiếp theo diễn ra. Điều này có hiệu quả làm cho refCount giảm trở lại bằng không, không có multicasting đang được thực hiện.

Sự cố nằm trong thực tế là luồng realSource không hoàn thành. Bởi vì chúng tôi không multicasting các thuê bao tiếp theo được một trường hợp mới của realSource thông qua ReplaySubject và phát thải mới được thêm vào trước với các phát thải đã được phát thải trước đó.

Vì vậy, để khắc phục luồng của bạn từ việc gọi yêu cầu HTTP đắt tiền nhiều lần, bạn phải hoàn thành luồng để publishReplay biết rằng nó không cần phải đăng ký lại.

4

Điều này xảy ra vì bạn đang sử dụng publishReplay(). Nó trong nội bộ tạo ra một thể hiện của ReplaySubject lưu trữ tất cả các giá trị đi qua.

Vì bạn đang sử dụng Observable.create nơi bạn phát ra một giá trị duy nhất thì mỗi lần bạn gọi source.subscribe(...) bạn nối thêm một giá trị vào bộ đệm trong ReplaySubject.

Bạn đang không nhận được call in ở đầu của mỗi dòng bởi vì đó là ReplaySubject người phát ra đệm của nó đầu tiên khi bạn đăng ký và sau đó nó đặt mua bản thân để nguồn của nó:

Để biết chi tiết thi hành xem:

Điều tương tự cũng áp dụng khi sử dụng publishReplay(1). Đầu tiên nó phát ra mục đệm từ ReplaySubject và sau đó thêm một mục từ observer.next(5);

+0

Câu trả lời duy nhất có liên quan –

4

chung: Các refCount phương tiện, mà suối nóng/chia sẻ miễn là có ít nhất 1 thuê bao - tuy nhiên, nó đang được thiết lập lại/lạnh khi không có người đăng ký.

Điều này có nghĩa là nếu bạn muốn hoàn toàn chắc chắn rằng không có gì được thực hiện nhiều lần, bạn không nên sử dụng refCount() mà chỉ đơn giản là connect luồng để đặt nóng.

Lưu ý bổ sung: Nếu bạn thêm observer.complete() sau observer.next(5);, bạn cũng sẽ nhận được kết quả mong đợi.


Sidenote: Bạn có thực sự cần tạo tùy chỉnh của riêng mình Obervable tại đây không? Trong 95% các trường hợp, các toán tử hiện có là đủ cho các usecase đã cho.

Các vấn đề liên quan