2012-11-19 23 views
9

Tôi đang viết một ứng dụng C# (.NET 4.5) được sử dụng để tổng hợp các sự kiện dựa trên thời gian cho mục đích báo cáo. Để làm cho logic truy vấn của tôi có thể tái sử dụng cho cả dữ liệu lịch sử và thời gian thực, tôi sử dụng Tiện ích mở rộng phản ứng (2.0) và cơ sở hạ tầng IScheduler (HistoricalScheduler và bạn bè của mình).Tại sao Observable.Generate() ném System.StackOverflowException?

Ví dụ, giả sử chúng ta tạo ra một danh sách các sự kiện (được sắp xếp thứ tự thời gian, nhưng họ có thể trùng!) Có tải trọng chỉ ist timestamp của họ và muốn biết phân phối của họ qua bộ đệm của một khoảng thời gian cố định:

const int num = 100000; 
const int dist = 10; 

var events = new List<DateTimeOffset>(); 
var curr = DateTimeOffset.Now; 
var gap = new Random(); 

var time = new HistoricalScheduler(curr); 

for (int i = 0; i < num; i++) 
{ 
    events.Add(curr); 
    curr += TimeSpan.FromMilliseconds(gap.Next(dist)); 
} 

var stream = Observable.Generate<int, DateTimeOffset>(
    0, 
    s => s < events.Count, 
    s => s + 1, 
    s => events[s], 
    s => events[s], 
    time); 

stream.Buffer(TimeSpan.FromMilliseconds(num), time) 
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); 

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 

chạy mã này kết quả trong một System.StackOverflowException với stack trace sau (Nó rất 3 dòng cuối cùng tất cả các con đường xuống):

mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes  
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes  
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes  
... 
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes  
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes  
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes  
... 

Ok, vấn đề dường như đến việc sử dụng của tôi Observable.Generate(), tùy thuộc vào danh sách kích thước (num) và bất kể việc chọn lịch biểu.

Tôi đang làm gì sai? Hoặc nói chung, cách nào là cách ưa thích để tạo một số IObservable từ một sự kiện IEnumerable của các sự kiện cung cấp dấu thời gian của riêng họ?

+1

Mức độ lớn hơn có thể là trước khi bạn gặp phải lỗi này? Ngoài ra, nếu bạn thực hiện một bước trong trình gỡ lỗi, dòng mã cuối cùng sẽ thực thi trước khi bạn thấy lỗi là gì? –

+0

Với tôi, ngưỡng quan trọng có vẻ là ~ 'num = 51600' (trong cấu hình Release, ít hơn một chút trong cấu hình Debug). Chuỗi quan sát dường như được tạo ra hoàn toàn. Tôi có thể nhấn breakpoint tại các biểu thức lamdba cho 'Observable.Generate()'. Ngoại lệ được ném sau cuộc gọi cuối cùng đến 'Console.WriteLine()'. –

+1

Hiểu, đây chỉ là phỏng đoán, nhưng có vẻ như đáng ngờ như luồng đang cố gắng vứt bỏ từng phần tử và mỗi phần tử đang cố gắng xử lý luồng. Bạn kết thúc với những gì cơ bản là đệ quy các cuộc gọi đến 'Hủy bỏ' hoặc' Vứt bỏ', mà thổi ngăn xếp của bạn (kích thước mặc định trong đó là 1 megabyte). Tôi không quen thuộc với 'Quan sát 'để nói tại sao điều này lại xảy ra. –

Trả lời

3

(cập nhật - nhận ra rằng tôi đã không cung cấp một sự lựa chọn: xem tại phía dưới cùng của câu trả lời)

Vấn đề là cách thức hoạt động của Observable.Generate - nó được sử dụng để mở ra máy phát điện corecursive (suy nghĩ đệ quy quay vào bên trong) dựa trên các đối số; nếu các đối số đó kết thúc bằng việc tạo ra rất trình tạo phân tích lồng nhau, bạn sẽ thổi xếp chồng của bạn.

Từ thời điểm này, Tôi đang suy đoán rất nhiều (không có nguồn Rx trước mặt mình) (xem bên dưới), nhưng tôi sẵn sàng đặt cược cho định nghĩa của bạn. :

initial_state => 
generate_next(initial_state) => 
generate_next(generate_next(initial_state)) => 
generate_next(generate_next(generate_next(initial_state))) => 
generate_next(generate_next(generate_next(generate_next(initial_state)))) => ... 

Và bật và tắt cho đến khi chồng cuộc gọi của bạn đủ lớn để tràn.Tại, nói, một phương thức chữ ký + int truy cập của bạn, mà muốn được một cái gì đó giống như 8-16 byte cho mỗi cuộc gọi đệ quy (tùy thuộc vào cách máy phát điện nhà nước được thực hiện), do đó, 60.000 âm thanh về bên phải (1M/16 ~ 62500 tối đa chiều sâu)

EDIT: kéo lên nguồn - khẳng định: "Run" phương pháp Tạo vẻ như thế này - lưu ý các cuộc gọi lồng nhau để Generate:

protected override IDisposable Run(
    IObserver<TResult> observer, 
    IDisposable cancel, 
    Action<IDisposable> setSink) 
{ 
    if (this._timeSelectorA != null) 
    { 
     Generate<TState, TResult>.α α = 
       new Generate<TState, TResult>.α(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
     setSink(α); 
     return α.Run(); 
    } 
    if (this._timeSelectorR != null) 
    { 
     Generate<TState, TResult>.δ δ = 
       new Generate<TState, TResult>.δ(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
     setSink(δ); 
     return δ.Run(); 
    } 
    Generate<TState, TResult>._ _ = 
      new Generate<TState, TResult>._(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
    setSink(_); 
    return _.Run(); 
} 

EDIT: derp, không phục vụ bất kỳ lựa chọn thay thế nào ... đây là một lựa chọn có thể hoạt động:

(EDIT: fixed Enumerable.Range, do đó kích thước luồng sẽ không được nhân với chunkSize)

const int num = 160000; 
const int dist = 10; 

var events = new List<DateTimeOffset>(); 
var curr = DateTimeOffset.Now; 
var gap = new Random(); 
var time = new HistoricalScheduler(curr); 

for (int i = 0; i < num; i++) 
{ 
    events.Add(curr); 
    curr += TimeSpan.FromMilliseconds(gap.Next(dist)); 
} 

    // Size too big? Fine, we'll chunk it up! 
const int chunkSize = 10000; 
var numberOfChunks = events.Count/chunkSize; 

    // Generate a whole mess of streams based on start/end indices 
var streams = 
    from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count/chunkSize) - 1) 
    let startIdx = chunkIndex * chunkSize 
    let endIdx = Math.Min(events.Count, startIdx + chunkSize) 
    select Observable.Generate<int, DateTimeOffset>(
     startIdx, 
     s => s < endIdx, 
     s => s + 1, 
     s => events[s], 
     s => events[s], 
     time); 

    // E pluribus streamum 
var stream = Observable.Concat(streams); 

stream.Buffer(TimeSpan.FromMilliseconds(num), time) 
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); 

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 
+0

Cảm ơn, đó là hoàn hảo! Cũng có vẻ hiệu quả hơn so với cách giải quyết của riêng tôi. Tôi đã phải sửa chữa một lỗi nhỏ trong số học của bạn, mặc dù (xem chỉnh sửa). Tôi vẫn không thấy lý do tại sao việc triển khai đệ quy là cần thiết bên trong RX. Sau khi tất cả, nó có vẻ làm việc với RX v1.0 (vượt quá một kích thước của 60.000). Tuy nhiên, điều tra tốt đẹp, giải pháp thông minh. Cảm ơn một lần nữa! –

+0

Không sao cả! Heh - Tôi thực sự ấn tượng, tôi chỉ có * một * lỗi toán học ...;) – JerKimball

3

OK, tôi đã thực hiện một phương pháp nhà máy khác không yêu cầu biểu thức lamdba dưới dạng trạng thái chuyển đổi và bây giờ tôi không thấy bất kỳ luồng tràn ngăn xếp nào nữa. Tôi đang chưa chắc chắn nếu điều này được xem như là một câu trả lời đúng cho câu hỏi của tôi, nhưng nó hoạt động và tôi nghĩ I'd chia sẻ nó ở đây:

var stream = Observable.Create<DateTimeOffset>(o => 
    { 
     foreach (var e in events) 
     { 
      time.Schedule(e,() => o.OnNext(e)); 
     } 

     time.Schedule(events[events.Count - 1],() => o.OnCompleted()); 

     return Disposable.Empty; 
    }); 

thủ lịch các sự kiện trước khi trở về đăng ký dường như (!) lúng túng với tôi, nhưng trong trường hợp này nó có thể được thực hiện bên trong biểu thức lambda.

Nếu có bất kỳ điều gì sai về phương pháp này, vui lòng sửa tôi. Ngoài ra, tôi vẫn rất vui khi biết những giả định tiềm ẩn của System.Reactive Tôi đã vi phạm với mã ban đầu của mình.

(Ôi, tôi nên đã kiểm tra rằng trước đó: với RX v1.0, bản gốc Observable.Generate() nào trong thực tế dường như làm việc!)

Các vấn đề liên quan