Đối với hồ sơ, đây là những gì tôi đã làm cuối cùng. Tôi vẫn còn rất nhiều người học Rx, và quay trở lại .Net lần cuối thấy nó ở phiên bản 2.0. Tất cả các thông tin phản hồi là rất biết ơn nhận được.
Đối tượng Ticks được sử dụng bên dưới có thể chứa một hoặc nhiều giá trị đánh dấu. Dịch vụ dữ liệu lịch sử trả về dữ liệu trong một số Ticks.
public class HistoricalAndLivePriceFeed : IPriceFeed
{
private readonly IPriceFeed history;
private readonly IPriceFeed live;
private readonly IClock clock;
public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live)
: this(history, live, new RealClock())
{
}
public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live, IClock clock)
{
this.history = history;
this.live = live;
this.clock = clock;
}
public IObservable<Ticks> For(DateTime since, ISymbol symbol)
{
return Observable.Create<Ticks>(observer =>
{
var liveStream = Buffer<Ticks>.StartBuffering(live.For(since, symbol));
var definitelyInHistoricalTicks = clock.Now;
// Sleep to make sure that historical data overlaps our live data
// If we ever use a data provider with less fresh historical data, we may need to rethink this
clock.Wait(TimeSpan.FromSeconds(1));
var liveStreamAfterEndOfHistoricalTicks = liveStream
.SkipWhile(ticks => ticks.LastTimestamp <= definitelyInHistoricalTicks)
.Select(ticks => ticks.RemoveBefore(definitelyInHistoricalTicks + 1));
var subscription = history.For(since, symbol)
.Select(historicalTicks => historicalTicks.RemoveAtOrAfter(definitelyInHistoricalTicks + 1))
.Concat(liveStreamAfterEndOfHistoricalTicks)
.Subscribe(observer);
return liveStream.And(subscription);
});
}
}
public static class CompositeDisposableExtensions
{
public static CompositeDisposable And(this IDisposable disposable, Action action)
{
return And(disposable, Disposable.Create(action));
}
public static CompositeDisposable And(this IDisposable disposable, IDisposable other)
{
return new CompositeDisposable(disposable, other);
}
}
nào sử dụng mã Rx này, mà tôi vẫn không hoàn toàn tin tưởng:
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
namespace My.Rx
{
/// <summary>
/// Buffers values from an underlying observable when no observers are subscribed.
///
/// On Subscription, any buffered values will be replayed.
///
/// Only supports one observer for now.
///
/// Buffer is an ISubject for convenience of implementation but IObserver methods
/// are hidden. It is not intended that Buffer should be used as an IObserver,
/// except through StartBuffering() and it is dangerous to do so because none of
/// the IObserver methods check whether Buffer has been disposed.
/// </summary>
/// <typeparam name="TSource"></typeparam>
public class Buffer<TSource> : ISubject<TSource>, IDisposable
{
private readonly object gate = new object();
private readonly Queue<TSource> queue = new Queue<TSource>();
private bool isDisposed;
private Exception error;
private bool stopped;
private IObserver<TSource> observer = null;
private IDisposable subscription;
public static Buffer<TSource> StartBuffering(IObservable<TSource> observable)
{
return new Buffer<TSource>(observable);
}
private Buffer(IObservable<TSource> observable)
{
subscription = observable.Subscribe(this);
}
void IObserver<TSource>.OnNext(TSource value)
{
lock (gate)
{
if (stopped) return;
if (IsBuffering)
queue.Enqueue(value);
else
observer.OnNext(value);
}
}
void IObserver<TSource>.OnError(Exception error)
{
lock (gate)
{
if (stopped) return;
if (IsBuffering)
this.error = error;
else
observer.OnError(error);
stopped = true;
}
}
void IObserver<TSource>.OnCompleted()
{
lock (gate)
{
stopped = true;
}
}
public IDisposable Subscribe(IObserver<TSource> observer)
{
lock (gate)
{
if (isDisposed)
throw new ObjectDisposedException(string.Empty);
if (this.observer != null)
throw new NotImplementedException("A Buffer can currently only support one observer at a time");
while(!queue.IsEmpty())
{
observer.OnNext(queue.Dequeue());
}
if (error != null)
observer.OnError(error);
else if (stopped)
observer.OnCompleted();
this.observer = observer;
return Disposable.Create(() =>
{
lock (gate)
{
// Go back to buffering
this.observer = null;
}
});
}
}
private bool IsBuffering
{
get { return observer == null; }
}
public void Dispose()
{
lock (gate)
{
subscription.Dispose();
isDisposed = true;
subscription = null;
observer = null;
}
}
}
}
nào vượt qua được những bài kiểm tra (tôi đã không làm phiền kiểm tra chủ đề an toàn chưa):
private static readonly Exception exceptionThrownFromUnderlying = new Exception("Hello world");
[Test]
public void ReplaysBufferedValuesToFirstSubscriber()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
underlying.OnNext(1);
underlying.OnNext(2);
var observed = new List<int>();
buffer.Subscribe(Observer.Create<int>(observed.Add));
Assert.That(observed, Is.EquivalentTo(new []{1,2}));
}
[Test]
public void PassesNewValuesToObserver()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
var observed = new List<int>();
buffer.Subscribe(Observer.Create<int>(observed.Add));
underlying.OnNext(1);
underlying.OnNext(2);
Assert.That(observed, Is.EquivalentTo(new[] { 1, 2 }));
}
[Test]
public void DisposesOfSubscriptions()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
var observed = new List<int>();
buffer.Subscribe(Observer.Create<int>(observed.Add))
.Dispose();
underlying.OnNext(1);
Assert.That(observed, Is.Empty);
}
[Test]
public void StartsBufferingAgainWhenSubscriptionIsDisposed()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
// These should be buffered
underlying.OnNext(1);
underlying.OnNext(2);
var firstSubscriptionObserved = new List<int>();
using (buffer.Subscribe(Observer.Create<int>(firstSubscriptionObserved.Add)))
{
// Should be passed through to first subscription
underlying.OnNext(3);
}
Assert.That(firstSubscriptionObserved, Is.EquivalentTo(new[] { 1, 2, 3 }));
// First subscription has been disposed-
// we should be back to buffering again
underlying.OnNext(4);
underlying.OnNext(5);
var secondSubscriptionObserved = new List<int>();
using (buffer.Subscribe(Observer.Create<int>(secondSubscriptionObserved.Add)))
{
// Should be passed through to second subscription
underlying.OnNext(6);
}
Assert.That(secondSubscriptionObserved, Is.EquivalentTo(new[] { 4, 5 ,6}));
}
[Test]
public void DoesNotSupportTwoConcurrentObservers()
{
// Use .Publish() if you need to do this
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
buffer.Subscribe(Observer.Create<int>(i => { }));
Assert.Throws<NotImplementedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
}
[Test]
public void CannotBeUsedAfterDisposal()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
buffer.Dispose();
Assert.Throws<ObjectDisposedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
}
[Test]
public void ReplaysBufferedError()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
underlying.OnNext(1);
underlying.OnError(exceptionThrownFromUnderlying);
var observed = new List<int>();
Exception foundException = null;
buffer.Subscribe(
observed.Add,
e => foundException = e);
Assert.That(observed, Is.EquivalentTo(new []{1}));
Assert.That(foundException, Is.EqualTo(exceptionThrownFromUnderlying));
}
[Test]
public void ReplaysBufferedCompletion()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
underlying.OnNext(1);
underlying.OnCompleted();
var observed = new List<int>();
var completed = false;
buffer.Subscribe(
observed.Add,
() => completed=true);
Assert.That(observed, Is.EquivalentTo(new[] { 1 }));
Assert.True(completed);
}
[Test]
public void ReplaysBufferedErrorToSubsequentObservers()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
underlying.OnNext(1);
underlying.OnError(exceptionThrownFromUnderlying);
// Drain value queue
using (buffer.Subscribe(Observer.Create<int>(i => { }, e => { }))) ;
var observered = new List<int>();
Exception exceptionEncountered = null;
using (buffer.Subscribe(Observer.Create<int>(observered.Add, e=>exceptionEncountered=e)));
Assert.That(observered, Is.Empty);
Assert.That(exceptionEncountered, Is.EqualTo(exceptionThrownFromUnderlying));
}
[Test]
public void ReplaysBufferedCompletionToSubsequentObservers()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
underlying.OnNext(1);
underlying.OnCompleted();
// Drain value queue
using (buffer.Subscribe(Observer.Create<int>(i => { }))) ;
var observered = new List<int>();
var completed = false;
using (buffer.Subscribe(Observer.Create<int>(observered.Add,()=>completed=true)));
Assert.That(observered, Is.Empty);
Assert.True(completed);
}
[Test]
public void DisposingOfBufferDisposesUnderlyingSubscription()
{
var underlyingSubscriptionWasDisposed = false;
var underlying = Observable.Create<int>(observer => Disposable.Create(() => underlyingSubscriptionWasDisposed= true ));
var buffer = Buffer<int>.StartBuffering(underlying);
buffer.Dispose();
Assert.True(underlyingSubscriptionWasDisposed);
}
'Switch()' có hoạt động ở đây không? Như trong: 'history.Switch (live)' – AlexFoxGill