2012-05-31 39 views
7

Tôi có một quan sát đại diện cho một dòng giá cổ phiếu. Nếu không có người quan sát nào trong chuỗi quan sát của tôi, tôi muốn có thể ngắt kết nối từ máy chủ từ xa đang cung cấp luồng giá, nhưng tôi không muốn làm điều đó cho đến khi mọi người quan sát được gọi là Dispose(). Sau đó, trong một thời trang tương tự, khi người đầu tiên gọi Đăng ký Tôi muốn kết nối lại với máy chủ từ xa.Theo dõi số quan sát viên (số lượng) trong một Quan sát?

Có cách nào để tìm ra số lượng người quan sát đã gọi là đăng ký trên một quan sát được không? Hoặc có lẽ một cách để biết khi nào các nhà quan sát đang gọi đến Đăng ký hoặc Vứt bỏ?

Trả lời

3

IObservable<T>interface mà bạn có thể triển khai. Trong phương thức Đăng ký của giao diện, bạn có thể theo dõi các nhà quan sát bằng cách duy trì một danh sách nội bộ.

Đoạn mã sau đây là từ MSDN.

private List<IObserver<Location>> observers; 

public IDisposable Subscribe(IObserver<Location> observer) 
{ 
    if (! observers.Contains(observer)) 
     observers.Add(observer); 

    // ------- If observers.Count == 1 create connection. ------- 

    return new Unsubscriber(observers, observer); 
} 
private class Unsubscriber : IDisposable 
{ 
    private List<IObserver<Location>>_observers; 
    private IObserver<Location> _observer; 

    public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer) 
    { 
     this._observers = observers; 
     this._observer = observer; 
    } 

    public void Dispose() 
    { 
     if (_observer != null && _observers.Contains(_observer)) 
     _observers.Remove(_observer); 
     // ----------- if observers.Count == 0 close connection ----------- 
    } 
} 
+0

Vâng, tôi đã nhận ra đây là cách tôi sẽ phải làm điều đó. Tôi đã hy vọng tôi chỉ có thể tận dụng một trong những đối tượng được xây dựng trong, nhưng có vẻ như tôi sẽ phải bọc một trong những (khả năng nhất BehaviorSubject) để tôi có thể theo dõi các thuê bao. –

+0

Giải pháp này không cung cấp bất kỳ chủ đề an toàn nào. Nó sẽ cần một chút công việc trước khi đi vào sản xuất. – Enigmativity

9

Tôi chỉ đơn giản sử dụng RefCount/Publish. Tôi luôn luôn cảm thấy như tôi đang thực hiện IObservable Tôi đang làm việc cách quá khó.

myColdObservable.Publish().RefCount(); 

Điều này sẽ làm cho dừng xung quan sát được của bạn sau khi mọi người đã ngắt kết nối. Dưới đây là một ví dụ:

var coldObservable = Observable 
    .Interval(TimeSpan.FromSeconds(1)) 
    .ObserveOn(Scheduler.TaskPool) 
    .Select(_ => DoSomething()); 

var refCountObs = coldObservable.Publish().RefCount(); 

CompositeDisposable d = new CompositeDisposable(); 
d.Add(refCountObs.Subscribe(n => Console.WriteLine("First got: " + n))); 
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Second got: " + n))); 
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Third got: " + n))); 

//Wait a bit for work to happen 
System.Threading.Thread.Sleep(10000); 

//Everyone unsubscribes 
d.Dispose(); 

//Observe that DoSomething is not called. 
System.Threading.Thread.Sleep(3000); 

này không bao gồm trường hợp bạn thực sự muốn biết số lượng thuê bao, nhưng tôi nghĩ rằng điều này phù hợp với yêu cầu của bạn dừng lại làm việc nếu không có thuê bao.

+0

Cách tiếp cận này không cung cấp cho bạn số lượng người đăng ký nhưng nó dừng nguồn có thể quan sát được khi tất cả người đăng ký đã hoàn tất. Tốt hơn nhiều so với thực hiện quan sát của riêng bạn. – Enigmativity

+0

Đây là câu trả lời hay nhất –

3

Nói chung, không triển khai IObservable; thường đã có soemthing trong Rx có thể giúp bạn ra ngoài, hoặc trực tiếp hoặc thông qua thành phần. Nếu bạn phải thực hiện IObservable, hãy sử dụng Observable.Create để làm như vậy, để có được tất cả các yêu cầu được bảo đảm cho hợp đồng quan sát viên, v.v.

Về vấn đề của bạn - đề xuất sử dụng Publish và RefCount chính xác là thành phần Bạn đang tìm. Nếu bạn muốn tự tính vì một số lý do, hãy sử dụng Observable.Defer để chặn các đăng ký, có thể với Observable.Finally để chặn các kết thúc chuỗi. Hoặc, bọc nguồn bằng Observable.Create, chuyển người quan sát đến trình tự được bao bọc và bọc IDisposable trả về với logic đếm (sử dụng Disposable.Create).

Chúc mừng,

-Bart (Rx đội)

4

Bit của một cũ nhưng tôi đã xem qua bài viết này như tôi đã có một vấn đề mà tôi cần phải biết số lượng thuê bao. Sử dụng gợi ý của Bart tôi đã đưa ra phần mở rộng này.

public static IObservable<T> CountSubscribers<T>(this IObservable<T> source, Action<int> countChanged) 
{ 
int count = 0; 

return Observable.Defer(() => 
{ 
    count = Interlocked.Increment(ref count); 
    countChanged(count); 
    return source.Finally(() => 
    { 
     count = Interlocked.Decrement(ref count); 
     countChanged(count); 
    }); 
}); 
} 
Các vấn đề liên quan