2013-02-11 28 views
10

Tôi đang thử Rx vì nó có vẻ phù hợp với miền của chúng tôi nhưng đường cong học tập đã khiến tôi ngạc nhiên.Hợp nhất dữ liệu giá cổ phiếu trong quá khứ và sống với Rx

Tôi cần kết hợp dữ liệu giá lịch sử với dữ liệu giá trực tiếp.

Tôi đang cố gắng để thích nghi với phương pháp thông thường để làm điều này sang ngôn ngữ của Rx:

  1. Theo dõi giá sống ngay lập tức và bắt đầu đệm các giá trị tôi nhận được lại
  2. Tiến hành một yêu cầu lịch sử dữ liệu giá (điều này cần phải xảy ra sau khi đăng ký giá trực tiếp để chúng tôi không có bất kỳ khoảng trống nào trong dữ liệu của chúng tôi)
  3. Xuất bản giá lịch sử khi họ quay lại
  4. Sau khi chúng tôi nhận được tất cả dữ liệu lịch sử, hãy xuất bản bộ đệm dữ liệu trực tiếp, xóa mọi giá trị trùng lắp với các dữ liệu lịch sử của chúng tôi ngay từ đầu
  5. Tiếp tục phát lại dữ liệu từ các thức ăn giá sống

tôi có mã người đàn ông rơm kinh tởm và không chính xác này mà dường như làm việc cho các trường hợp thử nghiệm ngây thơ tôi đã viết:

IConnectableObservable<Tick> live = liveService 
    .For(symbol) 
    .Replay(/* Some appropriate buffer size */); 
live.Connect(); 

IObservable<Tick> historical = historyService.For(since, symbol); 

return new[] {historical, live} 
    .Concat() 
    .Where(TicksAreInChronologicalOrder()); 

private static Func1<Tick,bool> TicksAreInChronologicalOrder() 
{ 
    // Some stateful predicate comparing the timestamp of this tick 
    // to the timestamp of the last tick we saw 
} 

này có một vài hạn chế

  1. Các phù hợp kích thước replay đệm không được biết đến. Đặt bộ đệm không giới hạn là không thể, đây là một chuỗi dài. Thực sự chúng tôi muốn một số loại bộ đệm một lần làm nổi bật cuộc gọi đầu tiên tới Đăng ký. Nếu điều này tồn tại trong Rx, tôi không thể tìm thấy nó.
  2. Bộ đệm phát lại sẽ tiếp tục tồn tại ngay cả khi chúng tôi đã chuyển sang xuất bản giá trực tiếp. Chúng tôi không cần bộ đệm tại thời điểm này.
  3. Tương tự, biến vị ngữ để lọc các dấu chồng chéo không cần thiết khi chúng tôi bỏ qua sự chồng chéo ban đầu giữa giá lịch sử và giá trực tiếp. Tôi thực sự muốn làm một cái gì đó như: live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */). Wait(this IObservable<TSource>) có hữu ích ở đây không?

Phải có cách tốt hơn để làm điều này, nhưng tôi vẫn đang chờ bộ não của mình rên Rx như FP.

Một tùy chọn khác mà tôi đã xem xét để giải quyết 1. Tôi đang viết phần mở rộng Rx của riêng mình là ISubject xếp hàng đợi cho đến khi nó được thuê bao đầu tiên (và từ chối người đăng ký sau đó?). Có lẽ đó là con đường để đi?

+0

'Switch()' có hoạt động ở đây không? Như trong: 'history.Switch (live)' – AlexFoxGill

Trả lời

1

Đối với hồ sơ, đây là những gì tôi đã làm cuối cùng. Tôi vẫn còn rất nhiều người học Rx, và quay trở lại .Net lần cuối thấy nó ở phiên bản 2.0. Tất cả các thông tin phản hồi là rất biết ơn nhận được.

Đối tượng Ticks được sử dụng bên dưới có thể chứa một hoặc nhiều giá trị đánh dấu. Dịch vụ dữ liệu lịch sử trả về dữ liệu trong một số Ticks.

public class HistoricalAndLivePriceFeed : IPriceFeed 
{ 
    private readonly IPriceFeed history; 
    private readonly IPriceFeed live; 
    private readonly IClock clock; 

    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live) 
:   this(history, live, new RealClock()) 
     { 
    } 
    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live, IClock clock) 
    { 
     this.history = history; 
     this.live = live; 
     this.clock = clock; 
    } 

    public IObservable<Ticks> For(DateTime since, ISymbol symbol) 
    { 
     return Observable.Create<Ticks>(observer => 
     { 
      var liveStream = Buffer<Ticks>.StartBuffering(live.For(since, symbol)); 

      var definitelyInHistoricalTicks = clock.Now; 
      // Sleep to make sure that historical data overlaps our live data 
      // If we ever use a data provider with less fresh historical data, we may need to rethink this 
      clock.Wait(TimeSpan.FromSeconds(1)); 

      var liveStreamAfterEndOfHistoricalTicks = liveStream 
       .SkipWhile(ticks => ticks.LastTimestamp <= definitelyInHistoricalTicks) 
       .Select(ticks => ticks.RemoveBefore(definitelyInHistoricalTicks + 1)); 

      var subscription = history.For(since, symbol) 
       .Select(historicalTicks => historicalTicks.RemoveAtOrAfter(definitelyInHistoricalTicks + 1)) 
       .Concat(liveStreamAfterEndOfHistoricalTicks) 
       .Subscribe(observer); 

      return liveStream.And(subscription); 
     }); 
    } 
} 
public static class CompositeDisposableExtensions 
{ 
    public static CompositeDisposable And(this IDisposable disposable, Action action) 
    { 
     return And(disposable, Disposable.Create(action)); 
    } 

    public static CompositeDisposable And(this IDisposable disposable, IDisposable other) 
    { 
     return new CompositeDisposable(disposable, other); 
    } 
} 

nào sử dụng mã Rx này, mà tôi vẫn không hoàn toàn tin tưởng:

using System; 
using System.Collections.Generic; 
using System.Reactive.Disposables; 
using System.Reactive.Subjects; 

namespace My.Rx 
{ 
    /// <summary> 
    /// Buffers values from an underlying observable when no observers are subscribed. 
    /// 
    /// On Subscription, any buffered values will be replayed. 
    /// 
    /// Only supports one observer for now. 
    /// 
    /// Buffer is an ISubject for convenience of implementation but IObserver methods 
    /// are hidden. It is not intended that Buffer should be used as an IObserver, 
    /// except through StartBuffering() and it is dangerous to do so because none of 
    /// the IObserver methods check whether Buffer has been disposed. 
    /// </summary> 
    /// <typeparam name="TSource"></typeparam> 
    public class Buffer<TSource> : ISubject<TSource>, IDisposable 
    { 
     private readonly object gate = new object(); 
     private readonly Queue<TSource> queue = new Queue<TSource>(); 

     private bool isDisposed; 
     private Exception error; 
     private bool stopped; 
     private IObserver<TSource> observer = null; 
     private IDisposable subscription; 

     public static Buffer<TSource> StartBuffering(IObservable<TSource> observable) 
     { 
      return new Buffer<TSource>(observable); 
     } 

     private Buffer(IObservable<TSource> observable) 
     { 
      subscription = observable.Subscribe(this); 
     } 

     void IObserver<TSource>.OnNext(TSource value) 
     { 
      lock (gate) 
      { 
       if (stopped) return; 
       if (IsBuffering) 
        queue.Enqueue(value); 
       else 
        observer.OnNext(value); 
      } 
     } 

     void IObserver<TSource>.OnError(Exception error) 
     { 
      lock (gate) 
      { 
       if (stopped) return; 
       if (IsBuffering) 
        this.error = error; 
       else 
        observer.OnError(error); 
       stopped = true; 
      } 
     } 

     void IObserver<TSource>.OnCompleted() 
     { 
      lock (gate) 
      { 
       stopped = true; 
      } 
     } 

     public IDisposable Subscribe(IObserver<TSource> observer) 
     { 
      lock (gate) 
      { 
       if (isDisposed) 
        throw new ObjectDisposedException(string.Empty); 

       if (this.observer != null) 
        throw new NotImplementedException("A Buffer can currently only support one observer at a time"); 

       while(!queue.IsEmpty()) 
       { 
        observer.OnNext(queue.Dequeue()); 
       } 

       if (error != null) 
        observer.OnError(error); 
       else if (stopped) 
        observer.OnCompleted(); 

       this.observer = observer; 
       return Disposable.Create(() => 
              { 
               lock (gate) 
               { 
                      // Go back to buffering 
                this.observer = null; 
               } 
              }); 
      } 
     } 

     private bool IsBuffering 
     { 
      get { return observer == null; } 
     } 


     public void Dispose() 
     { 
      lock (gate) 
      { 
       subscription.Dispose(); 

       isDisposed = true; 
       subscription = null; 
       observer = null; 
      } 
     } 
    } 
} 

nào vượt qua được những bài kiểm tra (tôi đã không làm phiền kiểm tra chủ đề an toàn chưa):

private static readonly Exception exceptionThrownFromUnderlying = new Exception("Hello world"); 

[Test] 
public void ReplaysBufferedValuesToFirstSubscriber() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 
    underlying.OnNext(1); 
    underlying.OnNext(2); 

    var observed = new List<int>(); 

    buffer.Subscribe(Observer.Create<int>(observed.Add)); 

    Assert.That(observed, Is.EquivalentTo(new []{1,2})); 
} 

[Test] 
public void PassesNewValuesToObserver() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    var observed = new List<int>(); 
    buffer.Subscribe(Observer.Create<int>(observed.Add)); 

    underlying.OnNext(1); 
    underlying.OnNext(2); 

    Assert.That(observed, Is.EquivalentTo(new[] { 1, 2 })); 
} 


[Test] 
public void DisposesOfSubscriptions() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    var observed = new List<int>(); 

    buffer.Subscribe(Observer.Create<int>(observed.Add)) 
     .Dispose(); 

    underlying.OnNext(1); 

    Assert.That(observed, Is.Empty); 
} 

[Test] 
public void StartsBufferingAgainWhenSubscriptionIsDisposed() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    // These should be buffered 
    underlying.OnNext(1); 
    underlying.OnNext(2); 

    var firstSubscriptionObserved = new List<int>(); 
    using (buffer.Subscribe(Observer.Create<int>(firstSubscriptionObserved.Add))) 
    { 
     // Should be passed through to first subscription 
     underlying.OnNext(3); 
    } 
    Assert.That(firstSubscriptionObserved, Is.EquivalentTo(new[] { 1, 2, 3 })); 

    // First subscription has been disposed- 
    // we should be back to buffering again 
    underlying.OnNext(4); 
    underlying.OnNext(5); 

    var secondSubscriptionObserved = new List<int>(); 
    using (buffer.Subscribe(Observer.Create<int>(secondSubscriptionObserved.Add))) 
    { 
     // Should be passed through to second subscription 
     underlying.OnNext(6); 
    } 
    Assert.That(secondSubscriptionObserved, Is.EquivalentTo(new[] { 4, 5 ,6})); 
} 

[Test] 
public void DoesNotSupportTwoConcurrentObservers() 
{ 
    // Use .Publish() if you need to do this 

    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    buffer.Subscribe(Observer.Create<int>(i => { })); 

    Assert.Throws<NotImplementedException>(() => buffer.Subscribe(Observer.Create<int>(i => { }))); 
} 

[Test] 
public void CannotBeUsedAfterDisposal() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 
    buffer.Dispose(); 

    Assert.Throws<ObjectDisposedException>(() => buffer.Subscribe(Observer.Create<int>(i => { }))); 
} 

[Test] 
public void ReplaysBufferedError() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnError(exceptionThrownFromUnderlying); 

    var observed = new List<int>(); 
    Exception foundException = null; 
    buffer.Subscribe(
     observed.Add, 
     e => foundException = e); 

    Assert.That(observed, Is.EquivalentTo(new []{1})); 
    Assert.That(foundException, Is.EqualTo(exceptionThrownFromUnderlying)); 
} 

[Test] 
public void ReplaysBufferedCompletion() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnCompleted(); 

    var observed = new List<int>(); 
    var completed = false; 
    buffer.Subscribe(
     observed.Add, 
     () => completed=true); 

    Assert.That(observed, Is.EquivalentTo(new[] { 1 })); 
    Assert.True(completed); 
} 

[Test] 
public void ReplaysBufferedErrorToSubsequentObservers() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnError(exceptionThrownFromUnderlying); 

    // Drain value queue 
    using (buffer.Subscribe(Observer.Create<int>(i => { }, e => { }))) ; 

    var observered = new List<int>(); 
    Exception exceptionEncountered = null; 
    using (buffer.Subscribe(Observer.Create<int>(observered.Add, e=>exceptionEncountered=e))); 

    Assert.That(observered, Is.Empty); 
    Assert.That(exceptionEncountered, Is.EqualTo(exceptionThrownFromUnderlying)); 
} 

[Test] 
public void ReplaysBufferedCompletionToSubsequentObservers() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnCompleted(); 

    // Drain value queue 
    using (buffer.Subscribe(Observer.Create<int>(i => { }))) ; 

    var observered = new List<int>(); 
    var completed = false; 
    using (buffer.Subscribe(Observer.Create<int>(observered.Add,()=>completed=true))); 

    Assert.That(observered, Is.Empty); 
    Assert.True(completed); 
} 



[Test] 
public void DisposingOfBufferDisposesUnderlyingSubscription() 
{ 
    var underlyingSubscriptionWasDisposed = false; 
    var underlying = Observable.Create<int>(observer => Disposable.Create(() => underlyingSubscriptionWasDisposed= true )); 

    var buffer = Buffer<int>.StartBuffering(underlying); 
    buffer.Dispose(); 

    Assert.True(underlyingSubscriptionWasDisposed); 
} 
0

Nếu dữ liệu lịch sử và sống của bạn là cả thời gian-or-scheduler-based, có nghĩa là, các dòng sự kiện trông như thế này theo thời gian:

|----------------------------------------------------> time 
    h h h h h h         historical 
       l l l l l l      live 

Bạn có thể sử dụng một đơn giản TakeUntil xây dựng:

var historicalStream = <fetch historical data>; 
var liveStream = <fetch live data>; 

var mergedWithoutOverlap = 
    // pull from historical 
    historicalStream 
     // until we start overlapping with live 
     .TakeUntil(liveStream) 
     // then continue with live data 
     .Concat(liveStream); 

Nếu bạn nhận được tất cả các dữ liệu lịch sử của bạn tất cả cùng một lúc, giống như một IEnumerable<T>, bạn có thể sử dụng một sự kết hợp của StartWith và logic khác của bạn:

var historicalData = <get IEnumerable of tick data>; 
var liveData = <get IObservable of tick data>; 

var mergedWithOverlap = 
    // the observable is the "long running" feed 
    liveData 
    // But we'll inject the historical data in front of it 
    .StartWith(historicalData) 
    // Perform filtering based on your needs 
    .Where(....); 
1

Làm thế nào về một cái gì đó như:

public static IObservable<T> CombineWithHistory<T, TSelectorResult>(this IObservable<T> live, IObservable<T> history, Func<T, TSelectorResult> selector) 
{ 
    var replaySubject = new ReplaySubject<T>(); 
    live.Subscribe(replaySubject); 
    return history.Concat(replaySubject).Distinct(selector); 
} 

này sử dụng một id chuỗi và khác biệt để lọc các bản sao.

Và các bài kiểm tra tương ứng:

var testScheduler = new TestScheduler(); 

var history = testScheduler.CreateColdObservable(
    OnNext(1L, new PriceTick { PriceId = 1 }), 
    OnNext(2L, new PriceTick { PriceId = 2 }), 
    OnNext(3L, new PriceTick { PriceId = 3 }), 
    OnNext(4L, new PriceTick { PriceId = 4 }), 
    OnCompleted(new PriceTick(), 5L)); 

var live = testScheduler.CreateHotObservable(
    OnNext(1L, new PriceTick { PriceId = 3 }), 
    OnNext(2L, new PriceTick { PriceId = 4 }), 
    OnNext(3L, new PriceTick { PriceId = 5 }), 
    OnNext(4L, new PriceTick { PriceId = 6 }), 
    OnNext(5L, new PriceTick { PriceId = 7 }), 
    OnNext(6L, new PriceTick { PriceId = 8 }), 
    OnNext(7L, new PriceTick { PriceId = 9 }) 
    ); 


live.Subscribe(pt => Console.WriteLine("Live {0}", pt.PriceId)); 
history.Subscribe(pt => Console.WriteLine("Hist {0}", pt.PriceId),() => Console.WriteLine("C")); 

var combined = live.CombineWithHistory(history, t => t.PriceId); 

combined.Subscribe(pt => Console.WriteLine("Combined {0}", pt.PriceId)); 

testScheduler.AdvanceTo(6L); 

Nếu bạn thực hiện bài kiểm tra này, kết hợp phát ra giá ve với id từ 1 tới 8.

+0

Cảm ơn Dave, điều này đã dạy cho tôi một số thủ thuật mới tốt với lịch trình. Vấn đề với việc sử dụng Distinct chỉ trong trường hợp của chúng tôi là: 1) Vì dữ liệu trở lại theo một hoặc nhiều bọ hơn là giá trị đơn, chúng tôi phải chọn SelectMany trước khi gọi bộ chọn. Điều đó là không thể cho khối lượng dữ liệu và yêu cầu hiệu suất của chúng tôi. 2) Để tiết kiệm bộ nhớ, các dấu tick của chúng tôi có dấu thời gian với độ chính xác của giây. Chúng ta có thể có vài dấu tích trong một giây với cùng một giá trị, do đó không thể viết hàm chọn chính xác ngữ nghĩa mà không có trạng thái. –

+0

Tôi thực sự cải thiện câu trả lời này cos Tôi thực sự đã làm nó bản thân mình nhưng quên đăng mã. Cuối cùng, tôi về cơ bản xếp hàng các ve và sau đó xóa chúng khi hoàn thành lịch sử. Bạn cần phải có một số id trình tự để đảm bảo rằng bạn không bị mất bất kỳ dữ liệu nào. –

+0

Ngoài ra, mã tôi đã cung cấp không đáp ứng yêu cầu của bạn vì chủ đề phát lại sẽ giữ toàn bộ lịch sử. –

0

Một cách thuận tiện về mặt bộ nhớ và giao dịch chồng chéo (chính xác).
Đang chờ phản hồi của bạn:

var tradeIds = new HashSet<string>(); 
var replayQuotationTrades = new ReplaySubject<IntradayTrade>(); 
var replaySubscription = _quotationTrades.Subscribe(replayQuotationTrades); 
return _historyTrades 
       .DelaySubscription(TimeSpan.FromMilliseconds(500), _backgroundScheduler) 
       .Do(t => tradeIds.Add(t.TradeId)) 
       .Finally(() => DisposeAndCompleteReplayStream(replaySubscription, replayQuotationTrades)) 
       .Concat(replayQuotationTrades.Where(t => !tradeIds.Contains(t.TradeId))) 
       .Finally(tradeIds.Clear) 
       .Concat(_quotationTrades) 
       .Subscribe(observer); 
Các vấn đề liên quan