Tôi có tệp CSV lớn ... 10 cột, 100 triệu hàng, có kích thước khoảng 6 GB trên đĩa cứng của tôi. Tôi muốn đọc dòng tệp CSV này và sau đó tải dữ liệu vào cơ sở dữ liệu máy chủ Microsoft SQL bằng cách sử dụng bản sao số lượng lớn SQL. Tôi đã đọc vài chủ đề ở đây và trên internet. Hầu hết mọi người đề nghị rằng đọc một tập tin CSV song song không mua nhiều về hiệu quả như các nhiệm vụ/chủ đề tranh giành quyền truy cập đĩa.Xuất tệp CSV lớn song song với máy chủ SQL
Điều tôi đang cố gắng làm là đọc từng dòng một từ CSV và thêm nó vào để chặn thu thập kích thước 100 nghìn hàng. Và một khi bộ sưu tập này đầy đủ quay lên một nhiệm vụ/luồng mới để ghi dữ liệu vào máy chủ SQL bằng cách sử dụng API SQLBuckCopy.
Tôi đã viết đoạn mã này, nhưng nhấn một lỗi vào thời gian chạy có nội dung "Cố gắng gọi bản sao hàng loạt trên một đối tượng có hoạt động đang chờ xử lý". Kịch bản này trông giống như một cái gì đó mà có thể dễ dàng giải quyết bằng cách sử dụng NET 4.0 TPL nhưng tôi không thể làm cho nó hoạt động. Bất kỳ đề xuất về những gì tôi đang làm sai?
public static void LoadCsvDataInParalleToSqlServer(string fileName, string connectionString, string table, DataColumn[] columns, bool truncate)
{
const int inputCollectionBufferSize = 1000000;
const int bulkInsertBufferCapacity = 100000;
const int bulkInsertConcurrency = 8;
var sqlConnection = new SqlConnection(connectionString);
sqlConnection.Open();
var sqlBulkCopy = new SqlBulkCopy(sqlConnection.ConnectionString, SqlBulkCopyOptions.TableLock)
{
EnableStreaming = true,
BatchSize = bulkInsertBufferCapacity,
DestinationTableName = table,
BulkCopyTimeout = (24 * 60 * 60),
};
BlockingCollection<DataRow> rows = new BlockingCollection<DataRow>(inputCollectionBufferSize);
DataTable dataTable = new DataTable(table);
dataTable.Columns.AddRange(columns);
Task loadTask = Task.Factory.StartNew(() =>
{
foreach (DataRow row in ReadRows(fileName, dataTable))
{
rows.Add(row);
}
rows.CompleteAdding();
});
List<Task> insertTasks = new List<Task>(bulkInsertConcurrency);
for (int i = 0; i < bulkInsertConcurrency; i++)
{
insertTasks.Add(Task.Factory.StartNew((x) =>
{
List<DataRow> bulkInsertBuffer = new List<DataRow>(bulkInsertBufferCapacity);
foreach (DataRow row in rows.GetConsumingEnumerable())
{
if (bulkInsertBuffer.Count == bulkInsertBufferCapacity)
{
SqlBulkCopy bulkCopy = x as SqlBulkCopy;
var dataRows = bulkInsertBuffer.ToArray();
bulkCopy.WriteToServer(dataRows);
Console.WriteLine("Inserted rows " + bulkInsertBuffer.Count);
bulkInsertBuffer.Clear();
}
bulkInsertBuffer.Add(row);
}
},
sqlBulkCopy));
}
loadTask.Wait();
Task.WaitAll(insertTasks.ToArray());
}
private static IEnumerable<DataRow> ReadRows(string fileName, DataTable dataTable)
{
using (var textFieldParser = new TextFieldParser(fileName))
{
textFieldParser.TextFieldType = FieldType.Delimited;
textFieldParser.Delimiters = new[] { "," };
textFieldParser.HasFieldsEnclosedInQuotes = true;
while (!textFieldParser.EndOfData)
{
string[] cols = textFieldParser.ReadFields();
DataRow row = dataTable.NewRow();
for (int i = 0; i < cols.Length; i++)
{
if (string.IsNullOrEmpty(cols[i]))
{
row[i] = DBNull.Value;
}
else
{
row[i] = cols[i];
}
}
yield return row;
}
}
}
Thay vì dành thời gian viết công cụ của riêng bạn, tại sao không sử dụng công cụ ETL đã thực hiện điều này như Dịch vụ tích hợp SQL Server. –
Bạn đã thử một phiên bản tuần tự của mã này và bạn đã chứng minh sự phức tạp của đa luồng có đáng để đạt được hiệu suất không? – Cory
Có nhiều hướng dẫn trực tuyến để tối ưu hóa chèn hàng loạt, tức là http://technet.microsoft.com/en-us/library/ms190421(v=sql.105).aspx. Có vẻ như bạn đang cố gắng giải quyết vấn đề mà bạn chưa chứng minh. Tôi đề nghị bạn đầu tiên có được một đường cơ sở chỉ đơn giản bằng cách sử dụng 'BCP.EXE' và sau đó thử và cải thiện vào thời gian đó. –