2016-04-19 25 views
5

Tôi đang nhận dữ liệu (được truyền trực tuyến) từ nguồn bên ngoài (trên Lightstreamer) vào ứng dụng C# của tôi. Ứng dụng C# của tôi nhận dữ liệu từ người nghe. Dữ liệu từ người nghe được lưu trữ trong một hàng đợi (ConcurrentQueue). Hàng đợi được dọn dẹp cứ sau 0,5 giây với TryDequeue vào một DataTable. DataTable sau đó sẽ được sao chép vào một cơ sở dữ liệu SQL bằng cách sử dụng SqlBulkCopy. Cơ sở dữ liệu SQL xử lý dữ liệu vừa mới đến từ bảng dàn vào bảng cuối cùng. Hiện tại tôi nhận được khoảng 300'000 hàng mỗi ngày (có thể tăng mạnh trong vòng vài tuần tới) và mục tiêu của tôi là ở lại dưới 1 giây kể từ khi tôi nhận được dữ liệu cho đến khi chúng có sẵn trong bảng SQL cuối cùng. Hiện tại, các hàng tối đa mỗi giây tôi phải xử lý là khoảng 50 hàng.C# cách nhanh nhất để chèn dữ liệu vào cơ sở dữ liệu SQL

Thật không may, vì nhận được ngày càng nhiều dữ liệu, logic của tôi đang chậm hơn trong hiệu suất (vẫn còn dưới 1 giây, nhưng tôi muốn tiếp tục cải thiện). Các nút cổ chai chính (cho đến nay) là việc xử lý dữ liệu dàn dựng (trên cơ sở dữ liệu SQL) vào bảng cuối cùng. Để cải thiện hiệu suất, tôi muốn chuyển bảng phân tầng thành bảng tối ưu hóa bộ nhớ. Bảng cuối cùng đã là một bảng tối ưu hóa bộ nhớ, do đó chúng sẽ hoạt động tốt với nhau.

Câu hỏi của tôi:

  1. Có cách nào để sử dụng SqlBulkCopy (trong số C#) với các bảng bộ nhớ được tối ưu hóa? (theo như tôi biết chưa có cách nào)
  2. Bất kỳ đề xuất nào về cách nhanh nhất để ghi dữ liệu đã nhận từ ứng dụng C# của tôi vào bảng dàn xếp được tối ưu hóa bộ nhớ?

EDIT (bằng dung dịch):

Sau các ý kiến ​​/ câu trả lời và đánh giá hiệu suất tôi quyết định từ bỏ sự chèn số lượng lớn và sử dụng SqlCommand để bàn giao một IEnumerable với dữ liệu của tôi như tham số bảng giá trị vào một thủ tục lưu trữ được biên dịch tự nhiên để lưu trữ dữ liệu trực tiếp trong bảng cuối cùng được tối ưu hóa bộ nhớ của tôi (cũng như một bản sao vào bảng "dàn dựng" hiện đang đóng vai trò là bản lưu trữ). Hiệu suất tăng lên đáng kể (thậm chí tôi đã không cân nhắc việc song song các phần chèn (sẽ ở giai đoạn sau)).

Đây là một phần của mã:

Memory được tối ưu hóa người dùng xác định loại bảng (để bàn giao dữ liệu từ C# vào SQL (stored procedure):

CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
    [CityIndexInstrumentID] [int] NOT NULL, 
    [CityIndexTimeStamp] [bigint] NOT NULL, 
    [BidPrice] [numeric](18, 8) NOT NULL, 
    [AskPrice] [numeric](18, 8) NOT NULL, 
    INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED 
(
    [CityIndexInstrumentID] ASC, 
    [CityIndexTimeStamp] ASC, 
    [BidPrice] ASC, 
    [AskPrice] ASC 
) 
) 
WITH (MEMORY_OPTIMIZED = ON) 

Native biên soạn các thủ tục lưu trữ để chèn dữ liệu vào bảng cuối cùng và dàn dựng (trong đó đóng vai trò lưu trữ trong trường hợp này):

create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging] 
(
    @ProcessingID int, 
    @CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly 
) 
with native_compilation, schemabinding, execute as owner 
as 
begin atomic 
with (transaction isolation level=snapshot, language=N'us_english') 


    -- store prices 

    insert into TimeSeries.CityIndexIntradayLivePrices 
    (
     ObjectID, 
     PerDateTime, 
     BidPrice, 
     AskPrice, 
     ProcessingID 
    ) 
    select Objects.ObjectID, 
    CityIndexTimeStamp, 
    CityIndexIntradayLivePricesStaging.BidPrice, 
    CityIndexIntradayLivePricesStaging.AskPrice, 
    @ProcessingID 
    from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging, 
    Objects.Objects 
    where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID 


    -- store data in staging table 

    insert into Staging.CityIndexIntradayLivePricesStaging 
    (
     ImportProcessingID, 
     CityIndexInstrumentID, 
     CityIndexTimeStamp, 
     BidPrice, 
     AskPrice 
    ) 
    select @ProcessingID, 
    CityIndexInstrumentID, 
    CityIndexTimeStamp, 
    BidPrice, 
    AskPrice 
    from @CityIndexIntradayLivePrices 


end 

IEnumerable tràn đầy từ hàng đợi:

0.123.
private static IEnumerable<SqlDataRecord> CreateSqlDataRecords() 
{ 


    // set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter) 

    SqlMetaData MetaDataCol1; 
    SqlMetaData MetaDataCol2; 
    SqlMetaData MetaDataCol3; 
    SqlMetaData MetaDataCol4; 

    MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int); 
    MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt); 
    MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale 
    MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale 


    // define sql data record with the columns 

    SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 }); 


    // remove each price row from queue and add it to the sql data record 

    LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO(); 

    while (IntradayQuotesQueue.TryDequeue(out PriceDTO)) 
    { 

     DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id 
     DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with/as escape sequence) 
     DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price 
     DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price 

     yield return DataRecord; 

    } 


} 

Xử lý dữ liệu mỗi 0,5 giây:

public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID) 
{ 


    try 
    { 

     // open new sql connection 

     using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false")) 
     { 


      // open connection 

      TimeSeriesDatabaseSQLConnection.Open(); 


      // endless loop to keep thread alive 

      while(true) 
      { 


       // ensure queue has rows to process (otherwise no need to continue) 

       if(IntradayQuotesQueue.Count > 0) 
       { 


        // define stored procedure for sql command 

        SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection); 


        // set command type to stored procedure 

        InsertCommand.CommandType = CommandType.StoredProcedure; 


        // define sql parameters (table-value parameter gets data from CreateSqlDataRecords()) 

        SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter 
        SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter 


        // set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters) 

        ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured; 


        // execute stored procedure 

        InsertCommand.ExecuteNonQuery(); 


       } 


       // wait 0.5 seconds 

       Thread.Sleep(500); 


      } 

     } 

    } 
    catch (Exception e) 
    { 

     // handle error (standard error messages and update processing) 

     ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e); 

    }; 


} 
+0

Nhìn vào TVP (tham số giá trị bảng) - bạn có thể sử dụng giống như một DataReader ngược lại. https://lennilobel.wordpress.com/2009/07/29/sql-server-2008-table-valued-parameters-and-c-custom-iterators-a-match-made-in-heaven/ – Paparazzi

Trả lời

2

Sử dụng SQL Server 2016 (RTM nó không nêu ra, nhưng nó đã tốt hơn nhiều so với năm 2014 khi nói đến bảng bộ nhớ được tối ưu hóa). Sau đó, hãy sử dụng số memory-optimized table variable hoặc chỉ cần thực hiện một loạt lệnh gọi native stored procedure trong một giao dịch, mỗi lần thực hiện một lần chèn, tùy thuộc vào điều gì nhanh hơn trong kịch bản của bạn (điều này thay đổi).Một vài điều cần lưu ý:

  • Thực hiện nhiều lần chèn trong một giao dịch là rất quan trọng để tiết kiệm chi phí trên mạng. Trong khi các hoạt động trong bộ nhớ rất nhanh, SQL Server vẫn cần phải xác nhận mọi hoạt động.
  • Tùy thuộc vào cách bạn đang tạo dữ liệu, bạn có thể thấy rằng việc song song các chèn có thể tăng tốc độ (đừng lạm dụng nó; bạn sẽ nhanh chóng đạt đến điểm bão hòa). Đừng cố gắng rất thông minh ở đây; đòn bẩy async/await và/hoặc Parallel.ForEach.
  • Nếu bạn đang chuyển thông số có giá trị bảng, cách dễ nhất để thực hiện là vượt qua giá trị tham số DataTable, nhưng đây không phải là cách hiệu quả nhất để thực hiện - có thể sẽ chuyển qua IEnumerable<SqlDataRecord>. Bạn có thể sử dụng một phương thức iterator để tạo ra các giá trị, do đó chỉ có một lượng bộ nhớ không đổi được cấp phát.

Bạn sẽ phải thử nghiệm một chút để tìm cách tối ưu để truyền dữ liệu; điều này phụ thuộc rất nhiều vào kích thước dữ liệu của bạn và cách bạn nhận được dữ liệu đó.

+0

Tôi hiện đang bằng cách sử dụng SQL Server 2014 và tôi đã có thể thực hiện giải pháp của mình với nó (mặc dù tôi đã phải thực hiện một số thỏa hiệp nhỏ). Nhưng tôi sẽ xem xét SQL Server 2016 càng sớm càng tốt. IEnumerable hoạt động tốt, nhanh hơn so với DataTable. Việc sử dụng tham số bảng có giá trị tối ưu hóa bộ nhớ và thủ tục lưu sẵn được biên dịch tự nhiên làm giảm khối lượng công việc cơ sở dữ liệu rất nhiều. – Reboon

0

Ghép dữ liệu từ bảng dàn vào bảng cuối cùng trong hàng đếm nhỏ hơn 5k, tôi thường sử dụng 4k và không chèn chúng vào giao dịch. Thay vào đó, hãy triển khai các giao dịch có lập trình nếu cần. Việc lưu giữ dưới 5k hàng được chèn sẽ giữ số lượng khóa hàng từ việc leo thang vào một khóa bảng, điều này phải chờ cho đến khi mọi người khác rời khỏi bàn.

0

Bạn có chắc đó là logic của bạn làm chậm và không thực sự giao dịch với cơ sở dữ liệu? Ví dụ, Entity Framework là "nhạy cảm", vì thiếu một thuật ngữ tốt hơn, khi cố gắng chèn một tấn hàng và trở nên khá chậm.

Có một thư viện của bên thứ ba, BulkInsert, trên CodePlex mà tôi đã sử dụng và nó khá tốt đẹp để làm chèn số lượng lớn dữ liệu: https://efbulkinsert.codeplex.com/

Bạn cũng có thể viết phương pháp khuyến nông của riêng bạn trên DBContext nếu bạn EF mà không điều này cũng có thể dựa trên số lượng bản ghi. Bất cứ điều gì dưới 5000 hàng sử dụng Save(), bất cứ điều gì trên đó bạn có thể gọi logic chèn số lượng lớn của riêng bạn.

Các vấn đề liên quan