2013-07-24 22 views
5

Trong mã của tôi, tôi phải xử lý "unmasking" của gói websocket, mà về cơ bản có nghĩa là XOR'ing dữ liệu chưa được sắp xếp có độ dài tùy ý. Nhờ có SO (Websocket data unmasking/multi byte xor) Tôi đã tìm ra cách (hy vọng) tăng tốc độ này bằng cách sử dụng phần mở rộng SSE2/AVX2, nhưng nhìn vào nó bây giờ, có vẻ như với tôi rằng việc xử lý dữ liệu không được ký hiệu của tôi hoàn toàn phụ tối ưu. Có cách nào để tối ưu hóa mã của tôi hay ít nhất là làm cho nó đơn giản hơn với cùng một hiệu suất, hay là mã của tôi đã hoạt động tốt nhất?tối ưu hóa SSE2/AVX2 XOR

Đây là phần quan trọng của mã (đối với câu hỏi tôi giả định rằng dữ liệu sẽ luôn đủ ít nhất để chạy chu trình AVX2 một lần, nhưng đồng thời phần lớn nó sẽ chỉ chạy một vài lần) :

// circular shift left for uint32 
int cshiftl_u32(uint32_t num, uint8_t shift) { 
    return (num << shift) | (num >> (32 - shift));                  
}                              

// circular shift right for uint32 
int cshiftr_u32(uint32_t num, uint8_t shift) { 
    return (num >> shift) | (num << (32 - shift));                  
}                              

void optimized_xor_32(uint32_t mask, uint8_t *ds, uint8_t *de) { 
    if (ds == de) return; // zero data len -> nothing to do 

    uint8_t maskOffset = 0; 

// process single bytes till 4 byte alignment (<= 3) 
    for (; ds < de && ((uint64_t)ds & (uint64_t)3); ds++) { 
     *ds ^= *((uint8_t *)(&mask) + maskOffset); 
     maskOffset = (maskOffset + 1) & (uint8_t)3; 
    } 

    if (ds == de) return; // done, return 

    if (maskOffset != 0) { // circular left-shift mask around so it works for other instructions 
     mask = cshiftl_u32(mask, maskOffset); 

     maskOffset = 0; 
    } 

// process 4 byte block till 8 byte alignment (<= 1) 
    uint8_t *de32 = (uint8_t *)((uint64_t)de & ~((uint64_t)31)); 

    if (ds < de32 && ((uint64_t)de & (uint64_t)7)) { 
     *(uint32_t *)ds ^= mask; // mask is uint32_t 

     if (++ds == de) return; 
    } 

// process 8 byte block till 16 byte alignment (<= 1) 
    uint64_t mask64 = mask | (mask << 4); 
    uint8_t *de64 = (uint8_t *)((uint64_t)de & ~((uint64_t)63)); 

    if (ds < de64 && ((uint64_t)ds & (uint64_t)15)) { 
     *(uint64_t *)ds ^= mask64; 

     if (++ds == de) return; // done, return 
    } 


// process 16 byte block till 32 byte alignment (<= 1) (if supported) 
#ifdef CPU_SSE2 
    __m128i v128, v128_mask; 
    v128_mask = _mm_set1_epi32(mask); 

    uint8_t *de128 = (uint8_t *)((uint64_t)de & ~((uint64_t)127)); 

    if (ds < de128 && ((uint64_t)ds & (uint64_t)31)) { 
     v128 = _mm_load_si128((__m128i *)ds); 
     v128 = _mm_xor_si128(v128, v128_mask); 
     _mm_store_si128((__m128i *)ds, v128); 

     if (++ds == de) return; // done, return 
    } 

#endif 
#ifdef CPU_AVX2 // process 32 byte blocks (if supported -> haswell upwards) 
    __m256i v256, v256_mask; 
    v256_mask = _mm256_set1_epi32(mask); 

    uint8_t *de256 = (uint8_t *)((uint64_t)de & ~((uint64_t)255)); 

    for (; ds < de256; ds+=32) { 
     v256 = _mm256_load_si256((__m256i *)ds); 
     v256 = _mm256_xor_si256(v256, v256_mask); 
     _mm256_store_si256((__m256i *)ds, v256); 
    } 

    if (ds == de) return; // done, return 
#endif 
#ifdef CPU_SSE2 // process remaining 16 byte blocks (if supported) 
    for (; ds < de128; ds+=16) { 
     v128 = _mm_load_si128((__m128i *)ds); 
     v128 = _mm_xor_si128(v128, v128_mask); 
     _mm_store_si128((__m128i *)ds, v128); 
    } 

    if (ds == de) return; // done, return 

#endif 
    // process remaining 8 byte blocks 
    // this should always be supported, so remaining can be assumed to be executed <= 1 times 
    for (; ds < de64; ds += 8) { 
     *(uint64_t *)ds ^= mask64; 
    } 

    if (ds == de) return; // done, return 

    // process remaining 4 byte blocks (<= 1) 
    if (ds < de32) { 
     *(uint32_t *)ds ^= mask; 

     if (++ds == de) return; // done, return 
    } 


    // process remaining bytes (<= 3) 

    for (; ds < de; ds ++) { 
     *ds ^= *((uint8_t *)(&mask) + maskOffset); 
     maskOffset = (maskOffset + 1) & (uint8_t)3; 
    } 

} 

Tái bút

+0

Bạn đã thử định thời gian mã của mình chưa? (Ngoài ra, bạn có thể muốn quấn bitwise '&' trong điều kiện của bạn bằng dấu ngoặc đơn) –

+1

Thời gian sẽ không thực sự hữu ích, vì tôi chỉ có thể đưa ra giả định về dữ liệu tôi sẽ nhận được như đầu vào, nhưng sẽ không nhận được bất kỳ đầu vào cho một vài tháng tới. Ngoài ra, tôi chỉ nhận được một số tuyệt đối với thời gian, điều này thực sự không giúp tôi vì vấn đề của tôi không tìm ra mã này cần bao lâu để thực thi với đầu vào xy, nhưng cách làm cho nó nhanh hơn, ví dụ: Tôi không có ý tưởng gì để thay đổi. Tái bút: Được bọc bitwise & để hiểu dễ dàng hơn, thx cho gợi ý! – griffin

+1

Tôi nghĩ bạn sẽ thấy rằng các quầy phụ thuộc dữ liệu lớn hơn lợi ích được căn chỉnh/không được căn chỉnh. Nếu bạn có thể hủy vòng lặp của mình gấp 2 lần, bạn sẽ thấy sự cải thiện đáng kể. – BitBank

Trả lời

2

Không giống như những gì nó nói trong phần lớn các bộ vi xử lý của Intel thực sự xử lý dữ liệu chưa được căn chỉnh. Vì bạn đang sử dụng các trình biên dịch của trình biên dịch Intel để xử lý vectơ, tôi cho rằng bạn có quyền truy cập vào phiên bản icc hợp lý gần đây.

Nếu bạn không thể căn chỉnh tự nhiên dữ liệu của mình thì tôi sợ rằng những gì bạn đang làm là gần như bạn có thể đạt được hiệu suất tối đa. Về việc làm cho mã dễ đọc hơn và có thể triển khai trên Xeon Phi (bộ đăng ký vector 64 byte)/Bộ vi xử lý vector dài hơn trong tương lai, tôi khuyên bạn nên bắt đầu sử dụng Intel Cilk Plus.

Ví dụ:

void intel_cilk_xor(uint32_t mask, uint8_t *d, size_t length) { 
    while (length & 0x3) { 
     *(d++) ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left 
     length--; 
    } 

    // switch to 4 bytes per block 
    uint32_t _d = d; 
    length >>= 2; 

    // Intel Cilk Plus Array Notation 
    // Should expand automatically to the best possible SIMD instructions 
    // you are compiling for 
    _d[0:length] ^= mask; 
} 

Xin lưu ý rằng tôi đã không kiểm tra mã này như tôi không có quyền truy cập vào một trình biên dịch Intel ngay bây giờ. Nếu bạn gặp phải vấn đề thì tôi có thể đi qua nó khi tôi trở lại văn phòng của tôi vào tuần tới.

Nếu bạn khá thích intrinsics sau đó sử dụng hợp lý các macro tiền xử lý có thể dễ dàng đáng kể cuộc sống của bạn:

#if defined(__MIC__) 
// intel Xeon Phi 
#define VECTOR_BLOCKSIZE 64 
// I do not remember the correct types/instructions right now 
#error "TODO: MIC handling" 
#elif defined(CPU_AVX2) 
#define VECTOR_BLOCKSIZE 32 
typedef __m256i my_vector_t; 
#define VECTOR_LOAD_MASK _mm256_set1_epi32 
#define VECTOR_XOR(d, mask) _mm_store_si256(d, _mm256_set1_epi32(_mm256_load_si256(d), mask)) 
#elif defined(CPU_SSE2) 
#define VECTOR_BLOCKSIZE 16 
typedef __m128i my_vector_t; 
#define VECTOR_LOAD_MASK _mm128_set1_epi32 
#define VECTOR_XOR(d, mask) _mm_store_si128(d, _mm128_set1_epi32(_mm128_load_si128(d), mask)) 
#else 
#define VECTOR_BLOCKSIZE 8 
#define VECTOR_LOAD_MASK(mask) ((mask) << 32 | (mask)) 
#define VECTOR_XOR(d, mask) (*(d)) ^= (mask) 
typedef uint64_t my_vector_t; 
#fi 

void optimized_xor_32(uint32_t mask, uint8_t *d, size_t length) { 
    size_t i; 

    // there really is no point in having extra 
    // branches for different vector lengths if they are 
    // executed at most once 
    // branch prediction is your friend here 
    // so we do one byte at a time until the block size 
    // is reached 

    while (length && (d & (VECTOR_BLOCKSIZE - 1))) { 
     *(d++) ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left 
     length--; 
    } 

    my_vector_t * d_vector = (my_vector_t *)d; 
    my_vector_t vector_mask = VECTOR_LOAD_MASK(mask); 

    size_t vector_legth = length/VECTOR_BLOCKSIZE; // compiler will optimise this to a bitshift 
    length &= VECTOR_BLOCKSIZE -1; // remaining length 

    for (i = 0; i < vector_legth; i++) { 
     VECTOR_XOR(d_vector + i, vector_mask); 
    } 

    // process the tail 
    d = (uint8_t*)(d_vector + i); 
    for (i = 0; i < length; i++) { 
     d[i] ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); 
    } 

} 

Ngày lưu ý khác: Bạn có thể muốn sử dụng x86 xoay hướng dẫn thay vì chút chuyển sang xoay mask:

#define asm_rol(var, bits) asm ("rol %1, %0" : "+r" (var) : "c" ((uint8_t)bits) : "cc") 
+0

Tôi không sử dụng icc chút nào ngoài gcc và tôi không có bất kỳ loại quyền truy cập đặc biệt nào đối với icc. Tuy nhiên, không biết về lệnh xoay, phải tra cứu chính xác nó là gì, thx! – griffin

+0

@griffin OK, tôi đã được ấn tượng rằng '_mm_load_si128' và gia đình là một' icc' được xây dựng trong. Trong trường hợp đó, bạn nên lấy đoạn mã thứ hai của tôi, chỉ cần không có phần cho MIC. Đáng buồn là không có nội tại cho các hướng dẫn xoay, tôi biết mặc dù rằng 'htons' sử dụng xoay 2 byte. –

+0

Được bình chọn, nhưng sẽ phải thử nó khi tôi có thời gian, có lẽ sẽ không xảy ra sớm, nhưng tôi chắc chắn sẽ chấp nhận điều này khi tôi thử nghiệm nó hoạt động và hoạt động tốt. Cảm ơn bạn đã chờ đợi! – griffin