2015-01-22 10 views
8

Tôi muốn nén một số bảng lớn chứa dữ liệu lịch sử hiếm khi hoặc không đọc. Trước tiên, tôi cố gắng sử dụng tính năng nén tích hợp (row, page, column stored, column-stored archive) nhưng không ai trong số chúng có thể nén các giá trị ngoài hàng (varchar(max), nvarchar(max)) và cuối cùng là cố gắng sử dụng giải pháp CLR.Nén hàng được đặt bằng CLR và GZIP

Giải pháp SQL Server Compressed Rowset Sample đang nén toàn bộ tập hợp hàng được trả về bởi truy vấn đã cho bằng cách sử dụng loại do người dùng xác định CLR.

Đối với, ví dụ:

CREATE TABLE Archive 
(
    [Date] DATETIME2 DEFAULT(GETUTCDATE()) 
    ,[Data] [dbo].[CompressedRowset] 
) 

INSERT INTO Archive([Data]) 
SELECT [dbo].[CompressQueryResults]('SELECT * FROM [dbo].[A]') 

Nó đang làm việc, nhưng tôi đã gặp những vấn đề sau:

  • khi tôi cố gắng để nén một tập kết quả hàng lớn, tôi nhận được như sau lỗi:

    Msg 0, Level 11, State 0, Line 0 Đã xảy ra lỗi nghiêm trọng trên lệnh hiện tại. Các kết quả, nếu có, cần được loại bỏ.

    Ngoài ra, báo cáo kết quả sau đây đang làm việc:

    SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]') 
    

    nhưng những người thân mà không:

    INSERT INTO Archive 
    SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]' 
    
    DECLARE @A [dbo].[CompressedRowset] 
    SELECT @A = [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]') 
    
  • để nén liên tiếp thiết lập các t-sql type nên được ánh xạ tới .net type; thật không may, điều này không đúng cho tất cả các loại sql - Mapping CLR Parameter Data; Tôi đã mở rộng các chức năng sau đây để xử lý các loại hơn, nhưng làm thế nào để xử lý các loại như geography ví dụ:

    static SqlDbType ToSqlType(Type t){ 
        if (t == typeof(int)){ 
         return SqlDbType.Int; 
        } 
    
        ... 
    
        if (t == typeof(Byte[])){ 
         return SqlDbType.VarBinary; 
        } else { 
         throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion"); 
        } 
    } 
    

Dưới đây là toàn bộ .net mã:

using System; 
using System.Data; 
using System.Data.SqlClient; 
using System.Data.SqlTypes; 
using Microsoft.SqlServer.Server; 
using System.IO; 
using System.Runtime.Serialization.Formatters.Binary; 
using System.IO.Compression; 
using System.Xml.Serialization; 
using System.Xml; 

[Serializable] 
[Microsoft.SqlServer.Server.SqlUserDefinedType 
    (
     Format.UserDefined 
     ,IsByteOrdered = false 
     ,IsFixedLength = false 
     ,MaxByteSize = -1 
    ) 
] 
public struct CompressedRowset : INullable, IBinarySerialize, IXmlSerializable 
{ 
    DataTable rowset; 

    public DataTable Data 
    { 
     get { return this.rowset; } 
     set { this.rowset = value; } 
    } 

    public override string ToString() 
    { 
     using (var sw = new StringWriter()) 
     using (var xw = new XmlTextWriter(sw)) 
     { 
      WriteXml(xw); 
      xw.Flush(); 
      sw.Flush(); 
      return sw.ToString(); 
     } 
    } 

    public bool IsNull 
    { 
     get { return (this.rowset == null);} 
    } 

    public static CompressedRowset Null 
    { 
     get 
     { 
      CompressedRowset h = new CompressedRowset(); 
      return h; 
     } 
    } 

    public static CompressedRowset Parse(SqlString s) 
    { 
     using (var sr = new StringReader(s.Value)) 
     using (var xr = new XmlTextReader(sr)) 
     { 
      var c = new CompressedRowset(); 
      c.ReadXml(xr); 
      return c; 
     } 
    } 


    #region "Stream Wrappers" 
    abstract class WrapperStream : Stream 
    { 
     public override bool CanSeek 
     { 
      get { return false; } 
     } 

     public override bool CanWrite 
     { 
      get { return false; } 
     } 

     public override void Flush() 
     { 

     } 

     public override long Length 
     { 
      get { throw new NotImplementedException(); } 
     } 

     public override long Position 
     { 
      get 
      { 
       throw new NotImplementedException(); 
      } 
      set 
      { 
       throw new NotImplementedException(); 
      } 
     } 


     public override long Seek(long offset, SeekOrigin origin) 
     { 
      throw new NotImplementedException(); 
     } 

     public override void SetLength(long value) 
     { 
      throw new NotImplementedException(); 
     } 


    } 

    class BinaryWriterStream : WrapperStream 
    { 
     BinaryWriter br; 
     public BinaryWriterStream(BinaryWriter br) 
     { 
      this.br = br; 
     } 
     public override bool CanRead 
     { 
      get { return false; } 
     } 
     public override bool CanWrite 
     { 
      get { return true; } 
     } 
     public override int Read(byte[] buffer, int offset, int count) 
     { 
      throw new NotImplementedException(); 
     } 
     public override void Write(byte[] buffer, int offset, int count) 
     { 
      br.Write(buffer, offset, count); 
     } 
    } 

    class BinaryReaderStream : WrapperStream 
    { 
     BinaryReader br; 
     public BinaryReaderStream(BinaryReader br) 
     { 
      this.br = br; 
     } 
     public override bool CanRead 
     { 
      get { return true; } 
     } 
     public override bool CanWrite 
     { 
      get { return false; } 
     } 
     public override int Read(byte[] buffer, int offset, int count) 
     { 
      return br.Read(buffer, offset, count); 
     } 
     public override void Write(byte[] buffer, int offset, int count) 
     { 
      throw new NotImplementedException(); 
     } 
    } 
    #endregion 

    #region "IBinarySerialize" 
    public void Read(System.IO.BinaryReader r) 
    { 
     using (var rs = new BinaryReaderStream(r)) 
     using (var cs = new GZipStream(rs, CompressionMode.Decompress)) 
     { 
      var ser = new BinaryFormatter(); 
      this.rowset = (DataTable)ser.Deserialize(cs); 
     } 
    } 
    public void Write(System.IO.BinaryWriter w) 
    { 
     if (this.IsNull) 
      return; 

     rowset.RemotingFormat = SerializationFormat.Binary; 
     var ser = new BinaryFormatter(); 
     using (var binaryWriterStream = new BinaryWriterStream(w)) 
     using (var compressionStream = new GZipStream(binaryWriterStream, CompressionMode.Compress)) 
     { 
      ser.Serialize(compressionStream, rowset); 
     } 

    } 

    #endregion 

    /// <summary> 
    /// This procedure takes an arbitrary query, runs it and compresses the results into a varbinary(max) blob. 
    /// If the query has a large result set, then this procedure will use a large amount of memory to buffer the results in 
    /// a DataTable, and more to copy it into a compressed buffer to return. 
    /// </summary> 
    /// <param name="query"></param> 
    /// <param name="results"></param> 
    //[Microsoft.SqlServer.Server.SqlProcedure] 
    [SqlFunction(DataAccess = DataAccessKind.Read, SystemDataAccess = SystemDataAccessKind.Read, IsDeterministic = false, IsPrecise = false)] 
    public static CompressedRowset CompressQueryResults(string query) 
    { 
     //open a context connection 
     using (var con = new SqlConnection("Context Connection=true")) 
     { 
      con.Open(); 
      var cmd = new SqlCommand(query, con); 
      var dt = new DataTable(); 
      using (var rdr = cmd.ExecuteReader()) 
      { 
       dt.Load(rdr); 
      } 
      //configure the DataTable for binary serialization 
      dt.RemotingFormat = SerializationFormat.Binary; 
      var bf = new BinaryFormatter(); 

      var cdt = new CompressedRowset(); 
      cdt.rowset = dt; 
      return cdt; 


     } 
    } 

    /// <summary> 
    /// partial Type mapping between SQL and .NET 
    /// </summary> 
    /// <param name="t"></param> 
    /// <returns></returns> 
    static SqlDbType ToSqlType(Type t) 
    { 
     if (t == typeof(int)) 
     { 
      return SqlDbType.Int; 
     } 
     if (t == typeof(string)) 
     { 
      return SqlDbType.NVarChar; 
     } 
     if (t == typeof(Boolean)) 
     { 
      return SqlDbType.Bit; 
     } 
     if (t == typeof(decimal)) 
     { 
      return SqlDbType.Decimal; 
     } 
     if (t == typeof(float)) 
     { 
      return SqlDbType.Real; 
     } 
     if (t == typeof(double)) 
     { 
      return SqlDbType.Float; 
     } 
     if (t == typeof(DateTime)) 
     { 
      return SqlDbType.DateTime; 
     } 
     if (t == typeof(Int64)) 
     { 
      return SqlDbType.BigInt; 
     } 
     if (t == typeof(Int16)) 
     { 
      return SqlDbType.SmallInt; 
     } 
     if (t == typeof(byte)) 
     { 
      return SqlDbType.TinyInt; 
     } 
     if (t == typeof(Guid)) 
     { 
      return SqlDbType.UniqueIdentifier; 
     } 
     //!!!!!!!!!!!!!!!!!!! 
     if (t == typeof(Byte[])) 
     { 
      return SqlDbType.VarBinary; 
     } 
     else 
     { 
      throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion"); 
     } 

    } 

    /// <summary> 
    /// This stored procedure takes a compressed DataTable and returns it as a resultset to the clinet 
    /// or into a table using exec .... into ... 
    /// </summary> 
    /// <param name="results"></param> 
    [Microsoft.SqlServer.Server.SqlProcedure] 
    public static void UnCompressRowset(CompressedRowset results) 
    { 
     if (results.IsNull) 
      return; 

     DataTable dt = results.rowset; 
     var fields = new SqlMetaData[dt.Columns.Count]; 
     for (int i = 0; i < dt.Columns.Count; i++) 
     { 
      var col = dt.Columns[i]; 
      var sqlType = ToSqlType(col.DataType); 
      var colName = col.ColumnName; 
      if (sqlType == SqlDbType.NVarChar || sqlType == SqlDbType.VarBinary) 
      { 
       fields[i] = new SqlMetaData(colName, sqlType, col.MaxLength); 
      } 
      else 
      { 
       fields[i] = new SqlMetaData(colName, sqlType); 
      } 
     } 
     var record = new SqlDataRecord(fields); 

     SqlContext.Pipe.SendResultsStart(record); 
     foreach (DataRow row in dt.Rows) 
     { 
      record.SetValues(row.ItemArray); 
      SqlContext.Pipe.SendResultsRow(record); 
     } 
     SqlContext.Pipe.SendResultsEnd(); 

    } 

    public System.Xml.Schema.XmlSchema GetSchema() 
    { 
     return null; 
    } 

    public void ReadXml(System.Xml.XmlReader reader) 
    { 
     if (rowset != null) 
     { 
      throw new InvalidOperationException("rowset already read"); 
     } 
     var ser = new XmlSerializer(typeof(DataTable)); 
     rowset = (DataTable)ser.Deserialize(reader); 
    } 

    public void WriteXml(System.Xml.XmlWriter writer) 
    { 
     if (String.IsNullOrEmpty(rowset.TableName)) 
      rowset.TableName = "Rows"; 

     var ser = new XmlSerializer(typeof(DataTable)); 
     ser.Serialize(writer, rowset); 
    } 
} 

và ở đây là tạo đối tượng SQL:

CREATE TYPE [dbo].[CompressedRowset] 
    EXTERNAL NAME [CompressedRowset].[CompressedRowset]; 

GO 

CREATE FUNCTION [dbo].[CompressQueryResults] (@query [nvarchar](4000)) 
RETURNS [dbo].[CompressedRowset] 
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[CompressQueryResults]; 

GO 

CREATE PROCEDURE [dbo].[UnCompressRowset] @results [dbo].[CompressedRowset] 
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[UnCompressRowset]; 

GO 
+0

Tuần tự hóa tạo ra sản lượng khá lớn và GZIpStream tích hợp nén kém. Không có thắc mắc kết quả không chính xác nổi bật. Tôi muốn cố gắng tuần tự hóa các đối tượng [] như các hàng để loại bỏ DataTable. – usr

+0

Tôi đoán ý tưởng trong ví dụ này là để nén hàng thiết lập để có thêm thông tin để nén (từ đây tỷ lệ nén tốt hơn), phải không? Ngoài ra, tôi không thể tìm thấy bất kỳ GZip alterntaives, bạn có thể đề xuất như vậy? – gotqn

+0

Tôi không thể đóng câu hỏi này bởi vì nó có tiền thưởng mở, nhưng tôi muốn đóng nó với lý do này * Quá rộng: có quá nhiều câu trả lời có thể, hoặc câu trả lời hay sẽ quá dài đối với định dạng này. Vui lòng thêm chi tiết để thu hẹp bộ câu trả lời hoặc cô lập một vấn đề có thể được trả lời trong một vài đoạn văn. * Vì vậy, vui lòng đưa ra các câu hỏi cụ thể hơn. Tôi có câu trả lời cho một số chủ đề trong mẫu, mã, ý kiến, lo lắng khổng lồ, không có thứ tự này ... Xin vui lòng, cải thiện nó. – JotaBe

Trả lời

0

Bạn có thay vì xem xét việc tạo một cơ sở dữ liệu 'Lưu trữ' mới (có thể được đặt thành mô hình khôi phục đơn giản), nơi bạn đổ tất cả dữ liệu cũ của mình? Điều đó có thể dễ dàng được truy cập trong các truy vấn để không có đau ở đó, ví dụ:

SELECT * FROM archive..olddata 

Khi bạn tạo các db, đặt nó trên đĩa khác, và xử lý nó khác nhau trong quy trình sao lưu của bạn - có lẽ bạn làm các thủ tục lưu trữ mỗi tuần một lần, sau đó chỉ cần được backuped sau đó - và sau bạn đã đè bẹp nó đến gần như bằng không với 7zip/rar.

Đừng cố gắng để nén các db sử dụng NTFS nén tuy nhiên, SQL server không hỗ trợ nó - thấy rằng ra bản thân mình, rất rất cuối buổi tối :)

1

Có lẽ đã quá muộn cho bản gốc câu hỏi, nhưng điều này có thể đáng xem xét cho những người khác lo ngại: trong SQL Server 2016 có các chức năng nén và giải nén (xem herehere) có thể hữu ích ở đây nếu dữ liệu bạn đang cố gắng lưu trữ chứa các giá trị lớn trong các cột [N]VARCHARVARBINARY .

Bạn sẽ cần phải đưa lớp này vào lớp logic nghiệp vụ hoặc tạo một số sắp xếp trong SQL Server, nhờ đó bạn sao chép bảng không nén của bạn dưới dạng khung nhìn vào bảng sao lưu (nơi giá trị được nén) và lấy dữ liệu chưa nén qua DECOMPRESS và có các kích hoạt INSTEAD OF cập nhật bảng sao lưu (do đó chế độ xem hoạt động như bảng gốc để chọn/chèn/cập nhật/xóa ngoài các khác biệt hiệu suất). Một chút hacky, nhưng nó sẽ làm việc ...

Đối với các phiên bản SQL cũ hơn, bạn có thể có thể viết một hàm CLR để thực hiện công việc.

Phương pháp này rõ ràng sẽ không hoạt động cho tập hợp dữ liệu bao gồm các trường nhỏ, kiểu nén này đơn giản sẽ không đạt được bất kỳ giá trị nhỏ nào (thực tế nó sẽ làm cho chúng lớn hơn).

+0

Có, cuối cùng họ đã thêm chức năng này làm chức năng tích hợp sẵn. – gotqn

Các vấn đề liên quan