2013-09-24 40 views
6

Tôi có một mảng thưa thớt a (chủ yếu là zero):thưa thớt nén mảng sử dụng SIMD (AVX2)

unsigned char a[1000000]; 

và tôi muốn tạo ra một mảng b các chỉ số đến các yếu tố khác không của a sử dụng hướng dẫn SIMD trên kiến ​​trúc Intel x64 với AVX2. Tôi đang tìm kiếm lời khuyên làm thế nào để làm điều đó một cách hiệu quả. Cụ thể, có các chỉ lệnh SIMD để nhận vị trí của các phần tử khác không liên tiếp trong thanh ghi SIMD, được sắp xếp liên tục không?

+1

Không trực tiếp, nhưng bạn có thể 'pcmpeqb' nó chống lại số không, sau đó' pmovmskb' để đăng ký bình thường, và trích xuất chỉ mục đầu tiên với 'bsf' (và sau đó thứ hai và vv, hy vọng không quá nhiều) – harold

+1

Bạn sẽ cần phải cụ thể hơn chỉ là 'SIMD' - bạn đang nhắm đến kiến ​​trúc nào?x86, ARM, PowerPC, POWER và một số GPGPUs tất cả đều có các phần mở rộng SIMD khác nhau. Ngoài ra x86 có nhiều phần mở rộng SIMD: MMX, SSE, SSE2, SSE3, SSSE3, SSE4, AVX, AVX2, v.v. (Lưu ý rằng AVX2 có các chỉ lệnh SIMD có thể hữu ích trong ngữ cảnh này). –

+0

@Paul R Xin lỗi về điều đó. Tôi đã chỉnh sửa câu hỏi của mình - AVX2 có thể chấp nhận được. –

Trả lời

0

Mặc dù tập lệnh AVX2 có nhiều chỉ dẫn GATHER nhưng hiệu suất của nó quá chậm. Và cách hiệu quả nhất để thực hiện việc này - để xử lý một mảng theo cách thủ công.

1

Nếu bạn mong đợi số yếu tố khác không phải rất thấp (tức là ít hơn nhiều so với 1%), sau đó bạn chỉ có thể kiểm tra từng đoạn 16-byte cho là nonzero:

int mask = _mm_movemask_epi8(_mm_cmpeq_epi8(reg, _mm_setzero_si128()); 
    if (mask != 65535) { 
     //store zero bits of mask with scalar code 
    } 

Nếu tỷ lệ phần trăm của các nguyên tố tốt là đủ nhỏ, chi phí của các chi nhánh bị lỗi và chi phí của mã vô hướng chậm bên trong 'if' sẽ không đáng kể.


Để có giải pháp chung tốt, trước tiên hãy xem xét việc triển khai SSE của nén chặt. Nó loại bỏ tất cả các yếu tố từ zero mảng byte (ý tưởng lấy từ here):

__m128i shuf [65536]; //must be precomputed 
char cnt [65536]; //must be precomputed 

int compress(const char *src, int len, char *dst) { 
    char *ptr = dst; 
    for (int i = 0; i < len; i += 16) { 
     __m128i reg = _mm_load_si128((__m128i*)&src[i]); 
     __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128()); 
     int mask = _mm_movemask_epi8(zeroMask); 
     __m128i compressed = _mm_shuffle_epi8(reg, shuf[mask]); 
     _mm_storeu_si128((__m128i*)ptr, compressed); 
     ptr += cnt[mask]; //alternative: ptr += 16-_mm_popcnt_u32(mask); 
    } 
    return ptr - dst; 
} 

Như bạn thấy, (_mm_shuffle_epi8 + bảng tra cứu) có thể làm điều kỳ diệu. Tôi không biết bất kỳ cách nào khác để vector hóa mã phức tạp về cấu trúc như nén chặt luồng.


Bây giờ, vấn đề còn lại duy nhất với yêu cầu của bạn là bạn muốn lấy chỉ mục. Mỗi chỉ mục phải được lưu trữ trong giá trị 4 byte, do đó, một đoạn 16 byte đầu vào có thể tạo ra tới 64 byte đầu ra, không phù hợp với thanh ghi SSE đơn.

Một cách để xử lý việc này là giải nén đầu ra thành công đến 64 byte. Vì vậy, bạn thay thế reg bằng hằng số (0,1,2,3,4, ..., 15) trong mã, sau đó giải nén thanh ghi SSE thành 4 thanh ghi và thêm đăng ký với bốn giá trị i. Điều này sẽ mất nhiều hướng dẫn hơn: 6 giải nén hướng dẫn, 4 bổ sung, và 3 cửa hàng (một là đã có). Đối với tôi, đó là một chi phí rất lớn, đặc biệt là nếu bạn mong đợi ít hơn 25% các yếu tố nonzero.

Ngoài ra, bạn có thể giới hạn số lượng byte không đồng bộ được xử lý bằng lặp vòng lặp đơn 4, sao cho một thanh ghi luôn đủ cho đầu ra. Đây là đoạn mã mẫu:

__m128i shufMask [65536]; //must be precomputed 
char srcMove [65536]; //must be precomputed 
char dstMove [65536]; //must be precomputed 

int compress_ids(const char *src, int len, int *dst) { 
    const char *ptrSrc = src; 
    int *ptrDst = dst; 
    __m128i offsets = _mm_setr_epi8(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15); 
    __m128i base = _mm_setzero_si128(); 
    while (ptrSrc < src + len) { 
     __m128i reg = _mm_loadu_si128((__m128i*)ptrSrc); 
     __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128()); 
     int mask = _mm_movemask_epi8(zeroMask); 
     __m128i ids8 = _mm_shuffle_epi8(offsets, shufMask[mask]); 
     __m128i ids32 = _mm_unpacklo_epi16(_mm_unpacklo_epi8(ids8, _mm_setzero_si128()), _mm_setzero_si128()); 
     ids32 = _mm_add_epi32(ids32, base); 
     _mm_storeu_si128((__m128i*)ptrDst, ids32); 
     ptrDst += dstMove[mask]; //alternative: ptrDst += min(16-_mm_popcnt_u32(mask), 4); 
     ptrSrc += srcMove[mask]; //no alternative without LUT 
     base = _mm_add_epi32(base, _mm_set1_epi32(dstMove[mask])); 
    } 
    return ptrDst - dst; 
} 

Một nhược điểm của phương pháp này là bây giờ mỗi lần lặp tiếp theo không thể bắt đầu cho đến khi dòng ptrDst += dstMove[mask]; được thực hiện trên phiên trước. Vì vậy, con đường quan trọng đã tăng lên đáng kể. Phần cứng siêu phân luồng hoặc mô phỏng thủ công của nó có thể loại bỏ hình phạt này.


Vì vậy, như bạn thấy, có nhiều biến thể của ý tưởng cơ bản này, tất cả đều giải quyết vấn đề của bạn với mức độ hiệu quả khác nhau. Bạn cũng có thể giảm kích thước của LUT nếu bạn không thích nó (một lần nữa, với chi phí giảm hiệu suất thông lượng).

Cách tiếp cận này không thể được mở rộng hoàn toàn cho các thanh ghi rộng hơn (tức làAVX2 và AVX-512), nhưng bạn có thể cố gắng kết hợp các lệnh của một số lần lặp liên tiếp thành một lệnh AVX2 hoặc AVX-512, do đó tăng thông lượng.

Lưu ý: Tôi đã không kiểm tra bất kỳ mã nào (bởi vì LUT precomputing yêu cầu chính xác đáng chú ý).

+0

Nó sẽ là tốt đẹp để xem cách tiếp cận LUT của bạn so sánh với [answer] của tôi (http://stackoverflow.com/a/41958528/2439725) dựa trên Hướng dẫn thao tác bit (BMI1 và BMI2). – wim

2

Năm phương pháp để tính toán các chỉ số của nonzeros là:

  • Semi vectorized loop: Load một vector với chars SIMD, so sánh với số không và áp dụng một movemask. Sử dụng vòng lặp vô hướng nhỏ nếu bất kỳ ký tự nào không phải là số (cũng được đề xuất bởi @stgatilov). Điều này hoạt động tốt cho các mảng rất thưa thớt. Hàm arr2ind_movmsk trong mã bên dưới sử dụng hướng dẫn BMI1 cho vòng lặp vô hướng.

  • Vòng lặp Vectorized: Bộ vi xử lý Intel Haswell và hỗ trợ mới hơn các bộ chỉ lệnh BMI1 và BMI2. BMI2 chứa hướng dẫn pext (Parallel bits extract, see wikipedia link), hóa ra hữu ích ở đây. Xem arr2ind_pext trong mã bên dưới.

  • Vòng vô hướng cổ điển với câu lệnh if: arr2ind_if.

  • Vòng tròn không có nhánh: arr2ind_cmov.

  • Bảng tìm kiếm: @stgatilov cho thấy có thể sử dụng bảng tra cứu thay vì pdep và các số nguyên khác hướng dẫn. Điều này có thể làm việc tốt, tuy nhiên, bảng tra cứu là khá lớn: nó không phù hợp trong bộ nhớ cache L1. Không được kiểm tra ở đây. Xem thêm phần thảo luận here.


/* 
gcc -O3 -Wall -m64 -mavx2 -fopenmp -march=broadwell -std=c99 -falign-loops=16 sprs_char2ind.c 

example: Test different methods with an array a of size 20000 and approximate 25/1024*100%=2.4% nonzeros: 
       ./a.out 20000 25 
*/ 

#include <stdio.h> 
#include <immintrin.h> 
#include <stdint.h> 
#include <omp.h> 
#include <string.h> 


__attribute__ ((noinline)) int arr2ind_movmsk(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0, k; 
    __m256i msk; 
    m0=0; 
    for (i=0;i<n;i=i+32){        /* Load 32 bytes and compare with zero:   */ 
     msk=_mm256_cmpeq_epi8(_mm256_load_si256((__m256i *)&a[i]),_mm256_setzero_si256()); 
     k=_mm256_movemask_epi8(msk); 
     k=~k;           /* Search for nonzero bits instead of zero bits. */ 
     while (k){ 
     ind[m0]=i+_tzcnt_u32(k);      /* Count the number of trailing zero bits in k. */ 
     m0++; 
     k=_blsr_u32(k);        /* Clear the lowest set bit in k.     */ 
     } 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__ ((noinline)) int arr2ind_pext(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0; 
    uint64_t  cntr_const = 0xFEDCBA; 
    __m256i  shft  = _mm256_set_epi64x(0x04,0x00,0x04,0x00); 
    __m256i  vmsk  = _mm256_set1_epi8(0x0F); 
    __m256i  cnst16  = _mm256_set1_epi32(16); 
    __m256i  shf_lo  = _mm256_set_epi8(0x80,0x80,0x80,0x0B, 0x80,0x80,0x80,0x03, 0x80,0x80,0x80,0x0A, 0x80,0x80,0x80,0x02, 
              0x80,0x80,0x80,0x09, 0x80,0x80,0x80,0x01, 0x80,0x80,0x80,0x08, 0x80,0x80,0x80,0x00); 
    __m256i  shf_hi  = _mm256_set_epi8(0x80,0x80,0x80,0x0F, 0x80,0x80,0x80,0x07, 0x80,0x80,0x80,0x0E, 0x80,0x80,0x80,0x06, 
              0x80,0x80,0x80,0x0D, 0x80,0x80,0x80,0x05, 0x80,0x80,0x80,0x0C, 0x80,0x80,0x80,0x04); 
    __m128i  pshufbcnst = _mm_set_epi8(0x80,0x80,0x80,0x80,0x80,0x80,0x80,0x80, 0x0E,0x0C,0x0A,0x08,0x06,0x04,0x02,0x00);            

    __m256i  i_vec  = _mm256_setzero_si256(); 
    m0=0; 
    for (i=0;i<n;i=i+16){ 
     __m128i v   = _mm_load_si128((__m128i *)&a[i]);      /* Load 16 bytes.                    */ 
     __m128i msk  = _mm_cmpeq_epi8(v,_mm_setzero_si128());    /* Generate 16x8 bit mask.                  */ 
       msk  = _mm_srli_epi64(msk,4);        /* Pack 16x8 bit mask to 16x4 bit mask.               */ 
       msk  = _mm_shuffle_epi8(msk,pshufbcnst);      /* Pack 16x8 bit mask to 16x4 bit mask.               */ 
       msk  = _mm_xor_si128(msk,_mm_set1_epi32(-1));    /* Invert 16x4 mask.                   */ 
     uint64_t msk64  = _mm_cvtsi128_si64x(msk);        /* _mm_popcnt_u64 and _pext_u64 work on 64-bit general-purpose registers, not on simd registers.*/ 
     int  p   = _mm_popcnt_u64(msk64)>>2;        /* p is the number of nonzeros in 16 bytes of a.            */ 
     uint64_t cntr  = _pext_u64(cntr_const,msk64);       /* parallel bits extract. cntr contains p 4-bit integers. The 16 4-bit integers in cntr_const are shuffled to the p 4-bit integers that we want */ 
                        /* The next 7 intrinsics unpack these p 4-bit integers to p 32-bit integers.     */ 
     __m256i cntr256 = _mm256_set1_epi64x(cntr); 
       cntr256 = _mm256_srlv_epi64(cntr256,shft); 
       cntr256 = _mm256_and_si256(cntr256,vmsk); 
     __m256i cntr256_lo = _mm256_shuffle_epi8(cntr256,shf_lo); 
     __m256i cntr256_hi = _mm256_shuffle_epi8(cntr256,shf_hi); 
       cntr256_lo = _mm256_add_epi32(i_vec,cntr256_lo); 
       cntr256_hi = _mm256_add_epi32(i_vec,cntr256_hi); 

          _mm256_storeu_si256((__m256i *)&ind[m0],cntr256_lo);  /* Note that the stores of iteration i and i+16 may overlap.               */ 
          _mm256_storeu_si256((__m256i *)&ind[m0+8],cntr256_hi); /* Array ind has to be large enough to avoid segfaults. At most 16 integers are written more than strictly necessary */ 
       m0   = m0+p; 
       i_vec  = _mm256_add_epi32(i_vec,cnst16); 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__ ((noinline)) int arr2ind_if(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0; 
    m0=0; 
    for (i=0;i<n;i++){ 
     if (a[i]!=0){ 
     ind[m0]=i; 
     m0=m0+1; 
     } 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__((noinline)) int arr2ind_cmov(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0; 
    m0=0; 
    for (i=0;i<n;i++){ 
     ind[m0]=i; 
     m0=(a[i]==0)? m0 : m0+1; /* Compiles to cmov instruction. */ 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__ ((noinline)) int print_nonz(const unsigned char * restrict a, const int * restrict ind, const int m){ 
    int i; 
    for (i=0;i<m;i++) printf("i=%d, ind[i]=%d a[ind[i]]=%u\n",i,ind[i],a[ind[i]]); 
    printf("\n"); fflush(stdout); 
    return 0; 
} 


__attribute__ ((noinline)) int print_chk(const unsigned char * restrict a, const int * restrict ind, const int m){ 
    int i;        /* Compute a hash to compare the results of different methods. */ 
    unsigned int chk=0; 
    for (i=0;i<m;i++){ 
     chk=((chk<<1)|(chk>>31))^(ind[i]); 
    } 
    printf("chk = %10X\n",chk); 
    return 0; 
} 



int main(int argc, char **argv){ 
int n, i, m; 
unsigned int j, k, d; 
unsigned char *a; 
int *ind; 
double t0,t1; 
int meth, nrep; 
char txt[30]; 

sscanf(argv[1],"%d",&n);   /* Length of array a.         */ 
n=n>>5;        /* Adjust n to a multiple of 32.       */ 
n=n<<5; 
sscanf(argv[2],"%u",&d);   /* The approximate fraction of nonzeros in a is: d/1024 */ 
printf("n=%d, d=%u\n",n,d); 

a=_mm_malloc(n*sizeof(char),32); 
ind=_mm_malloc(n*sizeof(int),32);  

            /* Generate a pseudo random array a.      */ 
j=73659343;     
for (i=0;i<n;i++){ 
    j=j*653+1; 
    k=(j & 0x3FF00)>>8;    /* k is a pseudo random number between 0 and 1023  */ 
    if (k<d){ 
     a[i] = (j&0xFE)+1;   /* Set a[i] to nonzero.         */ 
    }else{ 
     a[i] = 0; 
    } 

} 

/* for (i=0;i<n;i++){if (a[i]!=0){printf("i=%d, a[i]=%u\n",i,a[i]);}} printf("\n"); */ /* Uncomment this line to print the nonzeros in a. */ 

char txt0[]="arr2ind_movmsk: "; 
char txt1[]="arr2ind_pext: "; 
char txt2[]="arr2ind_if:  "; 
char txt3[]="arr2ind_cmov: "; 

nrep=10000;         /* Repeat a function nrep times to make relatively accurate timings possible.       */ 
               /* With nrep=1000000: ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 519     */ 
               /* With nrep=10000:  ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 513     */ 
printf("nrep = \%d \n\n",nrep); 
arr2ind_movmsk(a,n,ind,&m);     /* Make sure that the arrays a and ind are read and/or written at least one time before benchmarking. */ 
for (meth=0;meth<4;meth++){ 
    t0=omp_get_wtime(); 
    switch (meth){ 
     case 0: for(i=0;i<nrep;i++) arr2ind_movmsk(a,n,ind,&m);   strcpy(txt,txt0); break; 
     case 1: for(i=0;i<nrep;i++) arr2ind_pext(a,n,ind,&m);   strcpy(txt,txt1); break; 
     case 2: for(i=0;i<nrep;i++) arr2ind_if(a,n,ind,&m);    strcpy(txt,txt2); break; 
     case 3: for(i=0;i<nrep;i++) arr2ind_cmov(a,n,ind,&m);   strcpy(txt,txt3); break; 
     default: ; 
    } 
    t1=omp_get_wtime(); 
    printf("method = %s ",txt); 
    /* print_chk(a,ind,m); */ 
    printf(" elapsed time = %6.2f\n",t1-t0); 
} 
print_nonz(a, ind, 2);           /* Do something with the results     */ 
printf("density = %f %% \n\n",((double)m)/((double)n)*100);  /* Actual nonzero density of array a.   */ 

/* print_nonz(a, ind, m); */ /* Uncomment this line to print the indices of the nonzeros.      */ 

return 0; 
} 

/* 
With nrep=1000000: 
./a.out 10016 4 ; ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 48 ; ./a.out 10016 519 ; ./a.out 10016 519  
With nrep=10000: 
./a.out 1000000 5 ; ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 52 ; ./a.out 1000000 513 ; ./a.out 1000000 513  
*/ 


Mã này đã được thử nghiệm với kích thước mảng của n = 10016 (các dữ liệu phù hợp trong bộ nhớ cache L1) và n = 1000000, với mật độ khác không khác nhau khoảng 0,5%, 5% và 50%. Đối với thời gian chính xác các chức năng được gọi là 1000000 và 10000 lần, tương ứng.


Time in seconds, size n=10016, 1e6 function calls. Intel core i5-6500 
        0.53%  5.1%  50.0% 
arr2ind_movmsk:  0.27  0.53  4.89 
arr2ind_pext:   1.44  1.59  1.45 
arr2ind_if:   5.93  8.95  33.82 
arr2ind_cmov:   6.82  6.83  6.82 

Time in seconds, size n=1000000, 1e4 function calls. 

        0.49%  5.1%  50.1% 
arr2ind_movmsk:  0.57  2.03  5.37 
arr2ind_pext:   1.47  1.47  1.46 
arr2ind_if:   5.88  8.98  38.59 
arr2ind_cmov:   6.82  6.81  6.81 


Trong những ví dụ các vòng vectorized là nhanh hơn so với các vòng vô hướng. Hiệu suất của arr2ind_movmsk phụ thuộc rất nhiều vào mật độ a. Chỉ có nhanh hơn arr2ind_pext nếu mật độ đủ nhỏ. Điểm hòa vốn cũng phụ thuộc vào kích thước mảng n. Chức năng 'arr2ind_if' rõ ràng bị lỗi dự đoán nhánh ở 50% mật độ không đông.

Các vấn đề liên quan