2013-05-29 43 views
8

Tôi có một ma trận thưa thớt lớn X ở định dạng scipy.sparse.csr_matrix và tôi muốn nhân cái này với một mảng khối u W sử dụng tính song song. Sau một số nghiên cứu tôi phát hiện ra tôi cần sử dụng Mảng trong đa xử lý để tránh sao chép X và W giữa các quá trình (ví dụ: tại đây: How to combine Pool.map with Array (shared memory) in Python multiprocessing?Is shared readonly data copied to different processes for Python multiprocessing?). Đây là nỗ lực mới nhất của tôiLàm thế nào để song song các phép nhân ma trận thưa thớt scipy

import multiprocessing 
import numpy 
import scipy.sparse 
import time 

def initProcess(data, indices, indptr, shape, Warr, Wshp): 
    global XData 
    global XIndices 
    global XIntptr 
    global Xshape 

    XData = data 
    XIndices = indices 
    XIntptr = indptr 
    Xshape = shape 

    global WArray 
    global WShape 

    WArray = Warr  
    WShape = Wshp 

def dot2(args): 
    rowInds, i = args  

    global XData 
    global XIndices 
    global XIntptr 
    global Xshape 

    data = numpy.frombuffer(XData, dtype=numpy.float) 
    indices = numpy.frombuffer(XIndices, dtype=numpy.int32) 
    indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32) 
    Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape) 

    global WArray 
    global WShape 
    W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape) 

    return Xr[rowInds[i]:rowInds[i+1], :].dot(W) 

def getMatmat(X): 
    numJobs = multiprocessing.cpu_count() 
    rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int) 

    #Store the data in X as RawArray objects so we can share it amoung processes 
    XData = multiprocessing.RawArray("d", X.data) 
    XIndices = multiprocessing.RawArray("i", X.indices) 
    XIndptr = multiprocessing.RawArray("i", X.indptr) 

    def matmat(W): 
     WArray = multiprocessing.RawArray("d", W.flatten()) 
     pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape)) 
     params = [] 

     for i in range(numJobs): 
      params.append((rowInds, i)) 

     iterator = pool.map(dot2, params) 
     P = numpy.zeros((X.shape[0], W.shape[1])) 

     for i in range(numJobs): 
      P[rowInds[i]:rowInds[i+1], :] = iterator[i] 

     return P 

    return matmat 

if __name__ == '__main__': 
    #Create a random sparse matrix X and a random dense one W  
    X = scipy.sparse.rand(10000, 8000, 0.1) 
    X = X.tocsr() 
    W = numpy.random.rand(8000, 20) 

    startTime = time.time() 
    A = getMatmat(X)(W) 
    parallelTime = time.time()-startTime 

    startTime = time.time() 
    B = X.dot(W) 
    nonParallelTime = time.time()-startTime 

    print(parallelTime, nonParallelTime) 

Tuy nhiên, đầu ra giống như sau: (4.431, 0.165) cho biết phiên bản song song chậm hơn nhiều so với phép nhân song song.

Tôi tin rằng sự chậm lại có thể được gây ra trong các tình huống tương tự khi đang sao chép dữ liệu lớn vào quy trình, nhưng đây không phải là trường hợp ở đây khi tôi sử dụng Array để lưu trữ các biến được chia sẻ (trừ khi nó xảy ra trong numpy.frombuffer hoặc khi nào tạo csr_matrix, nhưng sau đó tôi không thể tìm cách chia sẻ trực tiếp csr_matrix). Một nguyên nhân khác có thể có của tốc độ chậm là trả về một kết quả lớn của mỗi phép nhân ma trận cho mỗi quá trình tuy nhiên tôi không chắc chắn về một cách xung quanh điều này.

Ai đó có thể thấy tôi đang đi sai ở đâu? Cảm ơn bạn đã giúp đỡ!

Cập nhật: Tôi không chắc chắn nhưng tôi cho rằng việc chia sẻ lượng lớn dữ liệu giữa các quy trình không hiệu quả và lý tưởng nhất là tôi nên sử dụng đa luồng (mặc dù Khóa thông dịch toàn cầu (GIL) làm cho điều đó rất khó). Một cách xung quanh điều này là giải phóng GIL bằng cách sử dụng Cython ví dụ (xem http://docs.cython.org/src/userguide/parallelism.html), mặc dù rất nhiều hàm numpy cần phải đi qua GIL.

+0

Bạn có liên quan đến mật độ/scipy với một bản dựng ATLAS được tối ưu hóa, đa luồng không?Nếu bạn làm điều đó, bạn sẽ nhận được phép nhân song song miễn phí khi bạn sử dụng np.dot. –

+1

Tôi đang sử dụng một thư viện BLAS đa luồng (OpenBLAS) được liên kết với numpy/scipy nhưng tôi đã thử nghiệm X.dot (W) và numpy.dot (X, W) (sau này không hoạt động cho thưa thớt X) và điều này không song song. – Charanpal

Trả lời

1

Đặt cược tốt nhất của bạn là thả xuống C bằng Cython. Bằng cách đó bạn có thể đánh bại GIL và sử dụng OpenMP. Tôi không ngạc nhiên khi đa xử lý chậm hơn - có rất nhiều chi phí ở đó.

Dưới đây là một trình bao bọc OpenMP ngây thơ bao bọc của ma trận thưa thớt CSparse - mã sản phẩm vector trong python.

Trên máy tính xách tay của tôi, điều này chạy nhanh hơn một chút so với scipy. Nhưng tôi không có nhiều lõi. Mã, bao gồm cả tập lệnh setup.py và các tệp tiêu đề C và nội dung có trong gist này: https://gist.github.com/rmcgibbo/6019670

Tôi nghi ngờ rằng nếu bạn thực sự muốn mã song song nhanh (trên máy tính xách tay của tôi, nó chỉ nhanh hơn khoảng 20%) hơn scipy đơn luồng, ngay cả khi sử dụng 4 chủ đề), bạn cần phải suy nghĩ cẩn thận hơn về nơi mà sự song song xảy ra hơn tôi đã làm, chú ý đến vùng nhớ cache.

# psparse.pyx 

#----------------------------------------------------------------------------- 
# Imports 
#----------------------------------------------------------------------------- 
cimport cython 
cimport numpy as np 
import numpy as np 
import scipy.sparse 
from libc.stddef cimport ptrdiff_t 
from cython.parallel import parallel, prange 

#----------------------------------------------------------------------------- 
# Headers 
#----------------------------------------------------------------------------- 

ctypedef int csi 

ctypedef struct cs: 
    # matrix in compressed-column or triplet form 
    csi nzmax  # maximum number of entries 
    csi m   # number of rows 
    csi n   # number of columns 
    csi *p   # column pointers (size n+1) or col indices (size nzmax) 
    csi *i   # row indices, size nzmax 
    double *x  # numerical values, size nzmax 
    csi nz   # # of entries in triplet matrix, -1 for compressed-col 

cdef extern csi cs_gaxpy (cs *A, double *x, double *y) nogil 
cdef extern csi cs_print (cs *A, csi brief) nogil 

assert sizeof(csi) == 4 

#----------------------------------------------------------------------------- 
# Functions 
#----------------------------------------------------------------------------- 

@cython.boundscheck(False) 
def pmultiply(X not None, np.ndarray[ndim=2, mode='fortran', dtype=np.float64_t] W not None): 
    """Multiply a sparse CSC matrix by a dense matrix 

    Parameters 
    ---------- 
    X : scipy.sparse.csc_matrix 
     A sparse matrix, of size N x M 
    W : np.ndarray[dtype=float564, ndim=2, mode='fortran'] 
     A dense matrix, of size M x P. Note, W must be contiguous and in 
     fortran (column-major) order. You can ensure this using 
     numpy's `asfortranarray` function. 

    Returns 
    ------- 
    A : np.ndarray[dtype=float64, ndim=2, mode='fortran'] 
     A dense matrix, of size N x P, the result of multiplying X by W. 

    Notes 
    ----- 
    This function is parallelized over the columns of W using OpenMP. You 
    can control the number of threads at runtime using the OMP_NUM_THREADS 
    environment variable. The internal sparse matrix code is from CSPARSE, 
    a Concise Sparse matrix package. Copyright (c) 2006, Timothy A. Davis. 
    http://www.cise.ufl.edu/research/sparse/CSparse, licensed under the 
    GNU LGPL v2.1+. 

    References 
    ---------- 
    .. [1] Davis, Timothy A., "Direct Methods for Sparse Linear Systems 
    (Fundamentals of Algorithms 2)," SIAM Press, 2006. ISBN: 0898716136 
    """ 
    if X.shape[1] != W.shape[0]: 
     raise ValueError('matrices are not aligned') 

    cdef int i 
    cdef cs csX 
    cdef np.ndarray[double, ndim=2, mode='fortran'] result 
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indptr = X.indptr 
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indices = X.indices 
    cdef np.ndarray[double, ndim=1, mode = 'c'] data = X.data 

    # Pack the scipy data into the CSparse struct. This is just copying some 
    # pointers. 
    csX.nzmax = X.data.shape[0] 
    csX.m = X.shape[0] 
    csX.n = X.shape[1] 
    csX.p = &indptr[0] 
    csX.i = &indices[0] 
    csX.x = &data[0] 
    csX.nz = -1 # to indicate CSC format 

    result = np.zeros((X.shape[0], W.shape[1]), order='F', dtype=np.double) 
    for i in prange(W.shape[1], nogil=True): 
     # X is in fortran format, so we can get quick access to each of its 
     # columns 
     cs_gaxpy(&csX, &W[0, i], &result[0, i]) 

    return result 

Gọi một số C từ CSparse.

// src/cs_gaxpy.c 

#include "cs.h" 
/* y = A*x+y */ 
csi cs_gaxpy (const cs *A, const double *x, double *y) 
{ 
    csi p, j, n, *Ap, *Ai ; 
    double *Ax ; 
    if (!CS_CSC (A) || !x || !y) return (0) ;  /* check inputs */ 
    n = A->n ; Ap = A->p ; Ai = A->i ; Ax = A->x ; 
    for (j = 0 ; j < n ; j++) 
    { 
     for (p = Ap [j] ; p < Ap [j+1] ; p++) 
     { 
     y [Ai [p]] += Ax [p] * x [j] ; 
     } 
    } 
    return (1) ; 
} 
+0

Cảm ơn bạn đã phản hồi! Tôi đã có những suy nghĩ tương tự và đã viết một sản phẩm Cython/OpenMP dot dựa trên Eigen (xem pdot2d của https://github.com/charanpald/sppy/blob/master/sppy/csarray_base.pyx). Ở đây, tôi chia các hàng X thành các khối cpu_count và nó chạy xấp xỉ 2x nhanh hơn trên máy 8 lõi của tôi (tôi chắc chắn nó có thể được cải thiện). Tôi sẽ so sánh với giải pháp của bạn ngay sau khi tôi giải quyết một vài vấn đề với biên dịch. – Charanpal

1

Có thể hơi trễ với phản hồi. Nó có thể có thể để có được tốc độ đáng tin cậy song song bằng cách sử dụng gói pyTrilinos cung cấp hàm bao python cho nhiều chức năng trong Trilinos. Dưới đây là ví dụ của bạn chuyển đổi sang sử dụng pyTrilinos:

from PyTrilinos import Epetra 
from scipy.sparse import rand 
import numpy as np 

n_rows = 10000 
n_cols = 8000 
n_vecs = 20 
fill_factor = 0.1 

comm = Epetra.PyComm() 
my_id = comm.MyPID() 

row_map = Epetra.Map(n_rows, 0, comm) 
out_vec_map = row_map 
in_vec_map = Epetra.Map(n_cols, 0, comm) 
col_map = Epetra.Map(n_cols, range(n_cols), 0, comm) 

n_local_rows = row_map.NumMyElements() 

# Create local block matrix in scipy and convert to Epetra 
X = rand(n_local_rows, n_cols, fill_factor).tocoo() 

A = Epetra.CrsMatrix(Epetra.Copy, row_map, col_map, int(fill_factor*n_cols*1.2), True) 
A.InsertMyValues(X.row, X.col, X.data) 
A.FillComplete() 

# Create sub-vectors in numpy and convert to Epetra format 
W = np.random.rand(in_vec_map.NumMyElements(), n_vecs) 
V = Epetra.MultiVector(in_vec_map, n_vecs) 

V[:] = W.T # order of indices is opposite 

B = Epetra.MultiVector(out_vec_map, n_vecs) 

# Multiply 
A.Multiply(False, V, B) 

Sau đó bạn có thể chạy mã này sử dụng MPI

mpiexec -n 2 python scipy_to_trilinos.py 

ví dụ khác về PyTrilinos có thể được tìm thấy trên kho github here. Tất nhiên nếu một người sử dụng pyTrilinos, cách này để khởi tạo ma trận bằng cách sử dụng scipy có thể không phải là tối ưu nhất.

Các vấn đề liên quan