Tôi có một ma trận thưa thớt lớn X ở định dạng scipy.sparse.csr_matrix và tôi muốn nhân cái này với một mảng khối u W sử dụng tính song song. Sau một số nghiên cứu tôi phát hiện ra tôi cần sử dụng Mảng trong đa xử lý để tránh sao chép X và W giữa các quá trình (ví dụ: tại đây: How to combine Pool.map with Array (shared memory) in Python multiprocessing? và Is shared readonly data copied to different processes for Python multiprocessing?). Đây là nỗ lực mới nhất của tôiLàm thế nào để song song các phép nhân ma trận thưa thớt scipy
import multiprocessing
import numpy
import scipy.sparse
import time
def initProcess(data, indices, indptr, shape, Warr, Wshp):
global XData
global XIndices
global XIntptr
global Xshape
XData = data
XIndices = indices
XIntptr = indptr
Xshape = shape
global WArray
global WShape
WArray = Warr
WShape = Wshp
def dot2(args):
rowInds, i = args
global XData
global XIndices
global XIntptr
global Xshape
data = numpy.frombuffer(XData, dtype=numpy.float)
indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)
global WArray
global WShape
W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)
return Xr[rowInds[i]:rowInds[i+1], :].dot(W)
def getMatmat(X):
numJobs = multiprocessing.cpu_count()
rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)
#Store the data in X as RawArray objects so we can share it amoung processes
XData = multiprocessing.RawArray("d", X.data)
XIndices = multiprocessing.RawArray("i", X.indices)
XIndptr = multiprocessing.RawArray("i", X.indptr)
def matmat(W):
WArray = multiprocessing.RawArray("d", W.flatten())
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape))
params = []
for i in range(numJobs):
params.append((rowInds, i))
iterator = pool.map(dot2, params)
P = numpy.zeros((X.shape[0], W.shape[1]))
for i in range(numJobs):
P[rowInds[i]:rowInds[i+1], :] = iterator[i]
return P
return matmat
if __name__ == '__main__':
#Create a random sparse matrix X and a random dense one W
X = scipy.sparse.rand(10000, 8000, 0.1)
X = X.tocsr()
W = numpy.random.rand(8000, 20)
startTime = time.time()
A = getMatmat(X)(W)
parallelTime = time.time()-startTime
startTime = time.time()
B = X.dot(W)
nonParallelTime = time.time()-startTime
print(parallelTime, nonParallelTime)
Tuy nhiên, đầu ra giống như sau: (4.431, 0.165) cho biết phiên bản song song chậm hơn nhiều so với phép nhân song song.
Tôi tin rằng sự chậm lại có thể được gây ra trong các tình huống tương tự khi đang sao chép dữ liệu lớn vào quy trình, nhưng đây không phải là trường hợp ở đây khi tôi sử dụng Array để lưu trữ các biến được chia sẻ (trừ khi nó xảy ra trong numpy.frombuffer hoặc khi nào tạo csr_matrix, nhưng sau đó tôi không thể tìm cách chia sẻ trực tiếp csr_matrix). Một nguyên nhân khác có thể có của tốc độ chậm là trả về một kết quả lớn của mỗi phép nhân ma trận cho mỗi quá trình tuy nhiên tôi không chắc chắn về một cách xung quanh điều này.
Ai đó có thể thấy tôi đang đi sai ở đâu? Cảm ơn bạn đã giúp đỡ!
Cập nhật: Tôi không chắc chắn nhưng tôi cho rằng việc chia sẻ lượng lớn dữ liệu giữa các quy trình không hiệu quả và lý tưởng nhất là tôi nên sử dụng đa luồng (mặc dù Khóa thông dịch toàn cầu (GIL) làm cho điều đó rất khó). Một cách xung quanh điều này là giải phóng GIL bằng cách sử dụng Cython ví dụ (xem http://docs.cython.org/src/userguide/parallelism.html), mặc dù rất nhiều hàm numpy cần phải đi qua GIL.
Bạn có liên quan đến mật độ/scipy với một bản dựng ATLAS được tối ưu hóa, đa luồng không?Nếu bạn làm điều đó, bạn sẽ nhận được phép nhân song song miễn phí khi bạn sử dụng np.dot. –
Tôi đang sử dụng một thư viện BLAS đa luồng (OpenBLAS) được liên kết với numpy/scipy nhưng tôi đã thử nghiệm X.dot (W) và numpy.dot (X, W) (sau này không hoạt động cho thưa thớt X) và điều này không song song. – Charanpal