2014-12-31 16 views
23

Chúng tôi đang sử dụng các chức năng suối trong RavenDB để tải, chuyển đổi và di chuyển dữ liệu giữa 2 cơ sở dữ liệu như vậy:RavenDB Suối cho kết quả bị chặn - Kết nối Khả năng phục hồi

var query = originSession.Query<T>(IndexForQuery); 

using (var stream = originSession.Advanced.Stream(query)) 
{ 
    while (stream.MoveNext()) 
    { 
     var streamedDocument = stream.Current.Document; 

     OpenSessionAndMigrateSingleDocument(streamedDocument); 
    } 
} 

Vấn đề là một trong những bộ sưu tập có hàng triệu hàng, và chúng tôi tiếp tục nhận được một IOException theo định dạng sau:

Application: MigrateToNewSchema.exe 
Framework Version: v4.0.30319 
Description: The process was terminated due to an unhandled exception. 
Exception Info: System.IO.IOException 
Stack: 
    at System.Net.ConnectStream.Read(Byte[], Int32, Int32) 
    at System.IO.Compression.DeflateStream.Read(Byte[], Int32, Int32) 
    at System.IO.Compression.GZipStream.Read(Byte[], Int32, Int32) 
    at System.IO.StreamReader.ReadBuffer(Char[], Int32, Int32, Boolean ByRef) 
    at System.IO.StreamReader.Read(Char[], Int32, Int32) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadData(Boolean, Int32) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadStringIntoBuffer(Char) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseString(Char) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseValue() 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadInternal() 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.Read() 
    at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Json.Linq.RavenJToken.ReadFrom(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Client.Connection.ServerClient+<YieldStreamResults>d__6b.MoveNext() 
    at Raven.Client.Document.DocumentSession+<YieldQuery>d__c`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext() 
    at MigrateToNewSchema.Migrator.DataMigratorBase`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MigrateCollection() 
    at MigrateToNewSchema.Program.MigrateData(MigrateToNewSchema.Enums.CollectionToMigrate, Raven.Client.IDocumentStore, Raven.Client.IDocumentStore) 
    at MigrateToNewSchema.Program.Main(System.String[]) 

Điều này xảy ra một cách khá dài vào streaming và tất nhiên các vấn đề kết nối tạm thời sẽ xảy ra trong loại này kỳ (phải mất giờ để hoàn thành).

Tuy nhiên, khi chúng tôi thử lại, vì chúng tôi đang sử dụng Query, chúng tôi phải bắt đầu lại từ đầu. Vì vậy, cuối cùng nếu có lỗi kết nối trong toàn bộ Stream thì chúng tôi phải thử lại lần nữa và cho đến khi nó hoạt động kết thúc.

Tôi biết bạn có thể sử dụng ETag với luồng để khởi động lại hiệu quả tại một thời điểm nhất định, tuy nhiên không có quá tải để thực hiện điều này với Query mà chúng tôi cần lọc kết quả đang được di chuyển và chỉ định bộ sưu tập chính xác. Vì vậy, trong RavenDB, có cách nào để cải thiện khả năng phục hồi nội bộ của kết nối (thuộc tính chuỗi kết nối, cài đặt nội bộ, v.v.) hoặc có hiệu quả "khôi phục" luồng trên một lỗi không? Không.

+0

tôi đã phát hiện ra [Data Subscriptions] (http://ravendb.net/docs/article-page/3.0/csharp/client-api/data- đăng ký/cách tạo dữ liệu-đăng ký), một tính năng RavenDb 3.0 cung cấp cơ chế đáng tin cậy để lặp qua bộ sưu tập tài liệu phù hợp với tiêu chí đã chỉ định và cho phép bạn dễ dàng tiếp tục từ nơi bạn đã dừng lại. Nếu ai đó sẵn sàng để cùng nhau đưa ra một số mẫu mã cho thấy cách tính năng đó có thể trả lời câu hỏi này, tôi sẽ xem xét điều đó xứng đáng với tiền thưởng. – StriplingWarrior

+0

Bạn có bị ràng buộc với việc sử dụng truy vấn không? Mặc dù nó sẽ không hiệu quả hơn, đây là di chuyển nên bộ nhớ không phải là vấn đề - tại sao không lặp lại bộ sưu tập doc thô và lọc trong bộ nhớ, vì vậy bạn có thể tiếp tục tại Etag? Đây là cách tôi xử lý tất cả các luồng, tôi không bao giờ sử dụng các truy vấn. – kamranicus

+0

@StriplingWarrior Đã một thời gian :-) Tôi không làm việc cho công ty sử dụng RavenDB nữa nhưng điều này vẫn khiến tôi quan tâm vì vậy tôi sẽ đưa ra câu trả lời với mã đăng ký dữ liệu ngày hôm nay –

Trả lời

2

Theo đề xuất từ ​​@StriplingWarrior Tôi đã tạo lại giải pháp bằng cách sử dụng Data Subscriptions.

Sử dụng phương pháp này tôi đã có thể lặp qua tất cả 2 triệu hàng (mặc dù được thừa nhận với quá trình xử lý ít hơn nhiều cho mỗi mục); 2 điểm ở đây là đã có thể giúp khi chúng tôi đang cố gắng để thực hiện cùng một logic sử dụng Luồng:

  1. lô chỉ sẽ bị xóa khỏi các thuê bao "hàng đợi" một lần thừa nhận (giống như hầu hết hàng đợi chuẩn)
    1. Các thuê bao IObserver<T> phải hoàn thành thành công cho xác nhận này được thiết lập.
    2. Thông tin này được xử lý bởi các máy chủ chứ không phải là khách hàng nên cho phép khách hàng để khởi động lại mà không ảnh hưởng đến vị trí thành công cuối cùng xử lý theo thuê bao
    3. See here for more details
  2. Như @StriplingWarrior chỉ vì bạn có thể tạo các đăng ký với bộ lọc phải xuống cấp thuộc tính, nó sẽ có thể phát lại với một tập hợp kết quả nhỏ hơn trong trường hợp ngoại lệ trong chính gói đăng ký đó.
    1. Điểm đầu tiên thực sự thay thế điều này; nhưng nó cho phép chúng tôi linh hoạt bổ sung chưa từng thấy trong các API Suối

Môi trường thử nghiệm là một cơ sở dữ liệu RavenDB 3.0 (máy tính cục bộ, chạy như một dịch vụ cửa sổ) với các thiết lập mặc định đối với một bộ sưu tập của 2 triệu bản.

mã để tạo hồ sơ giả:

using (IDocumentStore store = GetDocumentStore()) 
{ 
    store.Initialize(); 

    using (var bulkInsert = store.BulkInsert()) 
    { 
     for (var i = 0; i != recordsToCreate; i++) 
     { 
      var person = new Person 
      { 
       Id = Guid.NewGuid(), 
       Firstname = NameGenerator.GenerateFirstName(), 
       Lastname = NameGenerator.GenerateLastName() 
      }; 

      bulkInsert.Store(person); 
     } 
    } 
} 

Đang đăng ký vào bộ sưu tập này là sau đó một trường hợp:

using (IDocumentStore store = GetDocumentStore()) 
{ 
    store.Initialize(); 

    var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>()); 

    var personSubscription = store.Subscriptions.Open<Person>(
     subscriptionId, new SubscriptionConnectionOptions() 
    { 
     BatchOptions = new SubscriptionBatchOptions() 
     { 
      // Max number of docs that can be sent in a single batch 
      MaxDocCount = 16 * 1024, 
      // Max total batch size in bytes 
      MaxSize = 4 * 1024 * 1024, 
      // Max time the subscription needs to confirm that the batch 
      // has been successfully processed 
      AcknowledgmentTimeout = TimeSpan.FromMinutes(3) 
     }, 
     IgnoreSubscribersErrors = false, 
     ClientAliveNotificationInterval = TimeSpan.FromSeconds(30) 
    }); 

    personSubscription.Subscribe(new PersonObserver()); 

    while (true) 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(500)); 
    } 
} 

Lưu ý PersonObserver; đây chỉ là một việc thực hiện cơ bản của IObserver như vậy:

public class PersonObserver : IObserver<Person> 
{ 
    public void OnCompleted() 
    { 
     Console.WriteLine("Completed"); 
    } 

    public void OnError(Exception error) 
    { 
     Console.WriteLine("Error occurred: " + error.ToString()); 
    } 

    public void OnNext(Person person) 
    { 
     Console.WriteLine($"Received '{person.Firstname} {person.Lastname}'"); 
    } 
} 
+1

Viết tốt. Tôi thấy hữu ích khi chuyển vào một 'Task' (hoặc tạo một' Task' dựa trên 'CancellationToken' đã cho), và' await' công việc thay vì 'while (true)'. Bằng cách đó, mã gọi có thể hủy bỏ thao tác một cách an toàn mà không làm hỏng toàn bộ luồng hoặc quá trình. Tôi cũng đã đưa ra một cơ chế dựa trên ETAG để giúp di chuyển biết khi nào nó "thực hiện" đánh tất cả các tài liệu đích, vì vậy nó có thể tự dừng lại, nhưng nó khá phức tạp và không tuyệt vời cho mọi mục đích. – StriplingWarrior

Các vấn đề liên quan