2014-10-22 13 views
6

Tôi có tệp CSV lớn ... 10 cột, 100 triệu hàng, có kích thước khoảng 6 GB trên đĩa cứng của tôi. Tôi muốn đọc dòng tệp CSV này và sau đó tải dữ liệu vào cơ sở dữ liệu máy chủ Microsoft SQL bằng cách sử dụng bản sao số lượng lớn SQL. Tôi đã đọc vài chủ đề ở đây và trên internet. Hầu hết mọi người đề nghị rằng đọc một tập tin CSV song song không mua nhiều về hiệu quả như các nhiệm vụ/chủ đề tranh giành quyền truy cập đĩa.Xuất tệp CSV lớn song song với máy chủ SQL

Điều tôi đang cố gắng làm là đọc từng dòng một từ CSV và thêm nó vào để chặn thu thập kích thước 100 nghìn hàng. Và một khi bộ sưu tập này đầy đủ quay lên một nhiệm vụ/luồng mới để ghi dữ liệu vào máy chủ SQL bằng cách sử dụng API SQLBuckCopy.

Tôi đã viết đoạn mã này, nhưng nhấn một lỗi vào thời gian chạy có nội dung "Cố gắng gọi bản sao hàng loạt trên một đối tượng có hoạt động đang chờ xử lý". Kịch bản này trông giống như một cái gì đó mà có thể dễ dàng giải quyết bằng cách sử dụng NET 4.0 TPL nhưng tôi không thể làm cho nó hoạt động. Bất kỳ đề xuất về những gì tôi đang làm sai?

public static void LoadCsvDataInParalleToSqlServer(string fileName, string connectionString, string table, DataColumn[] columns, bool truncate) 
    { 
     const int inputCollectionBufferSize = 1000000; 
     const int bulkInsertBufferCapacity = 100000; 
     const int bulkInsertConcurrency = 8; 

     var sqlConnection = new SqlConnection(connectionString); 
     sqlConnection.Open(); 

     var sqlBulkCopy = new SqlBulkCopy(sqlConnection.ConnectionString, SqlBulkCopyOptions.TableLock) 
     { 
      EnableStreaming = true, 
      BatchSize = bulkInsertBufferCapacity, 
      DestinationTableName = table, 
      BulkCopyTimeout = (24 * 60 * 60), 
     }; 

     BlockingCollection<DataRow> rows = new BlockingCollection<DataRow>(inputCollectionBufferSize); 
     DataTable dataTable = new DataTable(table); 
     dataTable.Columns.AddRange(columns); 

     Task loadTask = Task.Factory.StartNew(() => 
      { 
       foreach (DataRow row in ReadRows(fileName, dataTable)) 
       { 
        rows.Add(row); 
       } 

       rows.CompleteAdding(); 
      }); 

     List<Task> insertTasks = new List<Task>(bulkInsertConcurrency); 

     for (int i = 0; i < bulkInsertConcurrency; i++) 
     { 
      insertTasks.Add(Task.Factory.StartNew((x) => 
       { 
        List<DataRow> bulkInsertBuffer = new List<DataRow>(bulkInsertBufferCapacity); 

        foreach (DataRow row in rows.GetConsumingEnumerable()) 
        { 
         if (bulkInsertBuffer.Count == bulkInsertBufferCapacity) 
         { 
          SqlBulkCopy bulkCopy = x as SqlBulkCopy; 
          var dataRows = bulkInsertBuffer.ToArray(); 
          bulkCopy.WriteToServer(dataRows); 
          Console.WriteLine("Inserted rows " + bulkInsertBuffer.Count); 
          bulkInsertBuffer.Clear(); 
         } 

         bulkInsertBuffer.Add(row); 
        } 

       }, 
       sqlBulkCopy)); 
     } 

     loadTask.Wait(); 
     Task.WaitAll(insertTasks.ToArray()); 
    } 

    private static IEnumerable<DataRow> ReadRows(string fileName, DataTable dataTable) 
    { 
     using (var textFieldParser = new TextFieldParser(fileName)) 
     { 
      textFieldParser.TextFieldType = FieldType.Delimited; 
      textFieldParser.Delimiters = new[] { "," }; 
      textFieldParser.HasFieldsEnclosedInQuotes = true; 

      while (!textFieldParser.EndOfData) 
      { 
       string[] cols = textFieldParser.ReadFields(); 

       DataRow row = dataTable.NewRow(); 

       for (int i = 0; i < cols.Length; i++) 
       { 
        if (string.IsNullOrEmpty(cols[i])) 
        { 
         row[i] = DBNull.Value; 
        } 
        else 
        { 
         row[i] = cols[i]; 
        } 
       } 

       yield return row; 
      } 
     } 
    } 
+9

Thay vì dành thời gian viết công cụ của riêng bạn, tại sao không sử dụng công cụ ETL đã thực hiện điều này như Dịch vụ tích hợp SQL Server. –

+2

Bạn đã thử một phiên bản tuần tự của mã này và bạn đã chứng minh sự phức tạp của đa luồng có đáng để đạt được hiệu suất không? – Cory

+2

Có nhiều hướng dẫn trực tuyến để tối ưu hóa chèn hàng loạt, tức là http://technet.microsoft.com/en-us/library/ms190421(v=sql.105).aspx. Có vẻ như bạn đang cố gắng giải quyết vấn đề mà bạn chưa chứng minh. Tôi đề nghị bạn đầu tiên có được một đường cơ sở chỉ đơn giản bằng cách sử dụng 'BCP.EXE' và sau đó thử và cải thiện vào thời gian đó. –

Trả lời

6

Không.

Truy cập song song có thể hoặc không cho phép bạn đọc nhanh hơn tệp (nó sẽ không, nhưng tôi sẽ không chiến đấu rằng trận chiến ...) nhưng đối với một số ghi song song, nó sẽ không cung cấp cho bạn chèn số lượng lớn nhanh hơn. Đó là do chèn số lượng lớn được ghi tối thiểu (ví dụ: thực sự nhanh chóng chèn số lượng lớn) yêu cầu khóa bàn. Xem Prerequisites for Minimal Logging in Bulk Import:

khai thác gỗ tối thiểu đòi hỏi rằng các bảng mục tiêu đáp ứng các điều kiện sau đây:

...
- Bảng khóa được quy định (sử dụng TABLOCK).
...

Chèn song song, theo định nghĩa, không thể có được ổ khóa đồng thời. QED. Bạn đang sủa cây sai.

Ngừng nhận nguồn của bạn từ tìm kiếm ngẫu nhiên trên internet. Đọc The Data Loading Performance Guide, là hướng dẫn để ... tải dữ liệu có hiệu suất.

Tôi khuyên bạn nên ngừng phát minh ra bánh xe. Sử dụng SSIS, đây là chính xác những gì được thiết kế để xử lý.

+0

Ok. Bạn có thể chỉ cho tôi một gói SSIS hiện có mà số lượng lớn chèn dữ liệu từ một tệp csv vào một bảng SQL không? Tôi không muốn tái phát minh ra bánh xe như bạn đã nói và vì vậy muốn sử dụng một số giải pháp đã tồn tại trước đó thay vì tạo gói ssis trên của riêng tôi – user330612

+0

Tất cả những gì bạn cần là [Nguồn Tệp Phẳng] (http: // msdn .microsoft.com/vi-us/library/ms139941.aspx) được kết nối với [Điểm đến OleDB] (http://msdn.microsoft.com/en-us/library/ms141237.aspx) với bộ tải nhanh. Xem [Nhập tệp CSV vào bảng cơ sở dữ liệu sử dụng SSIS] (http://blog.sqlauthority.com/2011/05/12/sql-server-import-csv-file-into-database-table-using-ssis/), ví dụ. –

+0

>> nhưng đối với một số song song viết nó sẽ không cung cấp cho bạn chèn số lượng lớn nhanh hơn. << Điều này không đúng. Tôi đã sử dụng một cái gì đó tương tự, PowerShell runspaces, với khóa bảng, và tôi đã đi từ 90.000 hàng/giây đến 140.000 hàng/giây. –

5

http://joshclose.github.io/CsvHelper/

https://efbulkinsert.codeplex.com/

Nếu có thể cho bạn, tôi đề nghị bạn đọc tập tin của bạn vào một danh sách < T> sử dụng csvhelper nói trên và ghi vào db của bạn sử dụng số lượng lớn chèn như bạn đang làm hoặc efbulkinsert mà tôi đã sử dụng và rất nhanh.

using CsvHelper; 

public static List<T> CSVImport<T,TClassMap>(string csvData, bool hasHeaderRow, char delimiter, out string errorMsg) where TClassMap : CsvHelper.Configuration.CsvClassMap 
    { 
     errorMsg = string.Empty; 
     var result = Enumerable.Empty<T>(); 

     MemoryStream memStream = new MemoryStream(Encoding.UTF8.GetBytes(csvData)); 
     StreamReader streamReader = new StreamReader(memStream); 
     var csvReader = new CsvReader(streamReader); 

     csvReader.Configuration.RegisterClassMap<TClassMap>(); 
     csvReader.Configuration.DetectColumnCountChanges = true; 
     csvReader.Configuration.IsHeaderCaseSensitive = false; 
     csvReader.Configuration.TrimHeaders = true; 
     csvReader.Configuration.Delimiter = delimiter.ToString(); 
     csvReader.Configuration.SkipEmptyRecords = true; 
     List<T> items = new List<T>(); 

     try 
     { 
      items = csvReader.GetRecords<T>().ToList(); 
     } 
     catch (Exception ex) 
     { 
      while (ex != null) 
      { 
       errorMsg += ex.Message + Environment.NewLine; 

       foreach (var val in ex.Data.Values) 
        errorMsg += val.ToString() + Environment.NewLine; 

       ex = ex.InnerException; 
      } 
     } 
     return items; 
    } 
} 

Chỉnh sửa - Tôi không hiểu bạn đang làm gì với chèn hàng loạt. Bạn muốn chèn số lượng lớn toàn bộ danh sách hoặc bảng dữ liệu dữ liệu, chứ không phải từng hàng.

+0

Các csv dường như rất lớn. (6 GB). Liệu 'GetRecords () .ToList()' tải mọi thứ vào bộ nhớ? – AechoLiu

+0

có - điểm tốt, nó có thể không được có thể cho anh ta. BulkInserting trong một gulp là một tiết kiệm thời gian lớn tho. Có lẽ anh ta có thể gọi Take() trong danh sách để chunk nó lên một chút. Danh sách của ông dường như phù hợp với một chuỗi được tạo ra bởi File.ReadLines. – Sam

+0

Tôi không thể tải toàn bộ tệp csv trong bộ nhớ vì kích thước. Vì vậy, tôi cần phải đọc 100K dòng tại một thời điểm và sau đó viết nó vào máy chủ SQL bằng cách sử dụng bulkinsert. Và có, tôi muốn viết toàn bộ bảng cùng một lúc chứ không phải một hàng tại một thời điểm. – user330612

3

Bạn có thể tạo thủ tục lưu trữ và vượt qua các vị trí tập tin như dưới đây

CREATE PROCEDURE [dbo].[CSVReaderTransaction] 
    @Filepath varchar(100)='' 
AS 
-- STEP 1: Start the transaction 
BEGIN TRANSACTION 

-- STEP 2 & 3: checking @@ERROR after each statement 
EXEC ('BULK INSERT Employee FROM ''' [email protected] 
     +''' WITH (FIELDTERMINATOR = '','', ROWTERMINATOR = ''\n'')') 

-- Rollback the transaction if there were any errors 
IF @@ERROR <> 0 
BEGIN 
    -- Rollback the transaction 
    ROLLBACK 

    -- Raise an error and return 
    RAISERROR ('Error in inserting data into employee Table.', 16, 1) 
    RETURN 
END 

COMMIT TRANSACTION 

Bạn cũng có thể thêm tùy chọn BATCHSIZE như FIELDTERMINATOR và ROWTERMINATOR.

+0

doesnt làm việc cho tôi coz tập tin là trên máy địa phương của tôi và máy chủ SQL là trên một máy khác nhau và máy đó không có quyền truy cập từ xa vào ổ đĩa địa phương của tôi – user330612

Các vấn đề liên quan