Tôi đang nhận dữ liệu (được truyền trực tuyến) từ nguồn bên ngoài (trên Lightstreamer) vào ứng dụng C# của tôi. Ứng dụng C# của tôi nhận dữ liệu từ người nghe. Dữ liệu từ người nghe được lưu trữ trong một hàng đợi (ConcurrentQueue). Hàng đợi được dọn dẹp cứ sau 0,5 giây với TryDequeue vào một DataTable. DataTable sau đó sẽ được sao chép vào một cơ sở dữ liệu SQL bằng cách sử dụng SqlBulkCopy. Cơ sở dữ liệu SQL xử lý dữ liệu vừa mới đến từ bảng dàn vào bảng cuối cùng. Hiện tại tôi nhận được khoảng 300'000 hàng mỗi ngày (có thể tăng mạnh trong vòng vài tuần tới) và mục tiêu của tôi là ở lại dưới 1 giây kể từ khi tôi nhận được dữ liệu cho đến khi chúng có sẵn trong bảng SQL cuối cùng. Hiện tại, các hàng tối đa mỗi giây tôi phải xử lý là khoảng 50 hàng.C# cách nhanh nhất để chèn dữ liệu vào cơ sở dữ liệu SQL
Thật không may, vì nhận được ngày càng nhiều dữ liệu, logic của tôi đang chậm hơn trong hiệu suất (vẫn còn dưới 1 giây, nhưng tôi muốn tiếp tục cải thiện). Các nút cổ chai chính (cho đến nay) là việc xử lý dữ liệu dàn dựng (trên cơ sở dữ liệu SQL) vào bảng cuối cùng. Để cải thiện hiệu suất, tôi muốn chuyển bảng phân tầng thành bảng tối ưu hóa bộ nhớ. Bảng cuối cùng đã là một bảng tối ưu hóa bộ nhớ, do đó chúng sẽ hoạt động tốt với nhau.
Câu hỏi của tôi:
- Có cách nào để sử dụng SqlBulkCopy (trong số C#) với các bảng bộ nhớ được tối ưu hóa? (theo như tôi biết chưa có cách nào)
- Bất kỳ đề xuất nào về cách nhanh nhất để ghi dữ liệu đã nhận từ ứng dụng C# của tôi vào bảng dàn xếp được tối ưu hóa bộ nhớ?
EDIT (bằng dung dịch):
Sau các ý kiến / câu trả lời và đánh giá hiệu suất tôi quyết định từ bỏ sự chèn số lượng lớn và sử dụng SqlCommand để bàn giao một IEnumerable với dữ liệu của tôi như tham số bảng giá trị vào một thủ tục lưu trữ được biên dịch tự nhiên để lưu trữ dữ liệu trực tiếp trong bảng cuối cùng được tối ưu hóa bộ nhớ của tôi (cũng như một bản sao vào bảng "dàn dựng" hiện đang đóng vai trò là bản lưu trữ). Hiệu suất tăng lên đáng kể (thậm chí tôi đã không cân nhắc việc song song các phần chèn (sẽ ở giai đoạn sau)).
Đây là một phần của mã:
Memory được tối ưu hóa người dùng xác định loại bảng (để bàn giao dữ liệu từ C# vào SQL (stored procedure):
CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
[CityIndexInstrumentID] [int] NOT NULL,
[CityIndexTimeStamp] [bigint] NOT NULL,
[BidPrice] [numeric](18, 8) NOT NULL,
[AskPrice] [numeric](18, 8) NOT NULL,
INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED
(
[CityIndexInstrumentID] ASC,
[CityIndexTimeStamp] ASC,
[BidPrice] ASC,
[AskPrice] ASC
)
)
WITH (MEMORY_OPTIMIZED = ON)
Native biên soạn các thủ tục lưu trữ để chèn dữ liệu vào bảng cuối cùng và dàn dựng (trong đó đóng vai trò lưu trữ trong trường hợp này):
create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging]
(
@ProcessingID int,
@CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly
)
with native_compilation, schemabinding, execute as owner
as
begin atomic
with (transaction isolation level=snapshot, language=N'us_english')
-- store prices
insert into TimeSeries.CityIndexIntradayLivePrices
(
ObjectID,
PerDateTime,
BidPrice,
AskPrice,
ProcessingID
)
select Objects.ObjectID,
CityIndexTimeStamp,
CityIndexIntradayLivePricesStaging.BidPrice,
CityIndexIntradayLivePricesStaging.AskPrice,
@ProcessingID
from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging,
Objects.Objects
where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID
-- store data in staging table
insert into Staging.CityIndexIntradayLivePricesStaging
(
ImportProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
)
select @ProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
from @CityIndexIntradayLivePrices
end
IEnumerable tràn đầy từ hàng đợi:
0.123.private static IEnumerable<SqlDataRecord> CreateSqlDataRecords()
{
// set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter)
SqlMetaData MetaDataCol1;
SqlMetaData MetaDataCol2;
SqlMetaData MetaDataCol3;
SqlMetaData MetaDataCol4;
MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int);
MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt);
MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
// define sql data record with the columns
SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 });
// remove each price row from queue and add it to the sql data record
LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO();
while (IntradayQuotesQueue.TryDequeue(out PriceDTO))
{
DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id
DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with/as escape sequence)
DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price
DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price
yield return DataRecord;
}
}
Xử lý dữ liệu mỗi 0,5 giây:
public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID)
{
try
{
// open new sql connection
using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false"))
{
// open connection
TimeSeriesDatabaseSQLConnection.Open();
// endless loop to keep thread alive
while(true)
{
// ensure queue has rows to process (otherwise no need to continue)
if(IntradayQuotesQueue.Count > 0)
{
// define stored procedure for sql command
SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection);
// set command type to stored procedure
InsertCommand.CommandType = CommandType.StoredProcedure;
// define sql parameters (table-value parameter gets data from CreateSqlDataRecords())
SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter
SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter
// set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters)
ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured;
// execute stored procedure
InsertCommand.ExecuteNonQuery();
}
// wait 0.5 seconds
Thread.Sleep(500);
}
}
}
catch (Exception e)
{
// handle error (standard error messages and update processing)
ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e);
};
}
Nhìn vào TVP (tham số giá trị bảng) - bạn có thể sử dụng giống như một DataReader ngược lại. https://lennilobel.wordpress.com/2009/07/29/sql-server-2008-table-valued-parameters-and-c-custom-iterators-a-match-made-in-heaven/ – Paparazzi