2012-04-01 27 views
19

Tôi đang sử dụng mô-đun multiprocessing của Python để xử lý song song mảng lớn song song. Các mảng được ánh xạ bộ nhớ bằng cách sử dụng numpy.load(mmap_mode='r') trong quy trình tổng thể. Sau đó, multiprocessing.Pool() dồn quá trình (tôi đoán).NumPy so với đa xử lý và mmap

Tất cả mọi thứ dường như làm việc tốt, ngoại trừ tôi nhận được những câu nói như:

AttributeError ("đối tượng 'NoneType' không có thuộc tính 'nói'",) trong <bound method memmap.__del__ of memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)> lờ

trong nhật ký không lưu trữ. Tuy nhiên, các bài kiểm tra đã trôi qua.

Bất kỳ ý tưởng gì đang xảy ra ở đó?

Sử dụng Python 2.7.2, OS X, NumPy 1.6.1.


UPDATE:

Sau khi một số lỗi, tôi săn lùng nguyên nhân để một con đường mã mà đã sử dụng một (slice nhỏ) mảng NumPy này bộ nhớ ánh xạ như đầu vào cho một cuộc gọi Pool.imap.

Rõ ràng "vấn đề" là cách thức multiprocessing.Pool.imap chuyển đầu vào của nó cho các quy trình mới: nó sử dụng dưa chuột. Điều này không hoạt động với mmap mảng numpy ed, và một cái gì đó bên trong vi phạm dẫn đến lỗi.

Tôi đã tìm thấy this reply bởi Robert Kern mà dường như giải quyết cùng một vấn đề. Ông đề nghị tạo một đường dẫn mã đặc biệt khi đầu vào imap xuất phát từ mảng được ánh xạ bộ nhớ: ánh xạ bộ nhớ cùng một mảng theo cách thủ công trong quá trình sinh ra.

Điều này sẽ rất phức tạp và xấu xí đến mức tôi muốn sống với lỗi và các bản sao bộ nhớ bổ sung. Có cách nào khác sẽ nhẹ hơn khi sửa đổi mã hiện tại không?

Trả lời

22

Cách tiếp cận thông thường của tôi (nếu bạn có thể sống với các bản sao bộ nhớ bổ sung) là thực hiện tất cả IO trong một quy trình và sau đó gửi mọi thứ ra ngoài một chuỗi các chuỗi công nhân. Để tải một lát của mảng được ghi vào bộ nhớ chỉ cần thực hiện x = np.array(data[yourslice]) (data[yourslice].copy() không thực sự làm điều này, điều này có thể dẫn đến một số nhầm lẫn.).

Trước hết, chúng ta hãy tạo ra một số dữ liệu thử nghiệm:

import numpy as np 
np.random.random(10000).tofile('data.dat') 

Bạn có thể tái tạo lỗi của bạn với một cái gì đó như thế này:

import numpy as np 
import multiprocessing 

def main(): 
    data = np.memmap('data.dat', dtype=np.float, mode='r') 
    pool = multiprocessing.Pool() 
    results = pool.imap(calculation, chunks(data)) 
    results = np.fromiter(results, dtype=np.float) 

def chunks(data, chunksize=100): 
    """Overly-simple chunker...""" 
    intervals = range(0, data.size, chunksize) + [None] 
    for start, stop in zip(intervals[:-1], intervals[1:]): 
     yield data[start:stop] 

def calculation(chunk): 
    """Dummy calculation.""" 
    return chunk.mean() - chunk.std() 

if __name__ == '__main__': 
    main() 

Và nếu bạn chỉ cần chuyển sang năng suất np.array(data[start:stop]) thay vào đó, bạn sẽ khắc phục sự cố:

import numpy as np 
import multiprocessing 

def main(): 
    data = np.memmap('data.dat', dtype=np.float, mode='r') 
    pool = multiprocessing.Pool() 
    results = pool.imap(calculation, chunks(data)) 
    results = np.fromiter(results, dtype=np.float) 

def chunks(data, chunksize=100): 
    """Overly-simple chunker...""" 
    intervals = range(0, data.size, chunksize) + [None] 
    for start, stop in zip(intervals[:-1], intervals[1:]): 
     yield np.array(data[start:stop]) 

def calculation(chunk): 
    """Dummy calculation.""" 
    return chunk.mean() - chunk.std() 

if __name__ == '__main__': 
    main() 

Tất nhiên, điều này tạo thêm bản sao trong bộ nhớ của từng đoạn.

Về lâu dài, có thể bạn sẽ thấy dễ dàng hơn khi chuyển khỏi các tệp được ghi nhớ và chuyển sang một thứ như HDF. Điều này đặc biệt đúng nếu dữ liệu của bạn đa chiều. (Tôi khuyên bạn nên h5py, nhưng pyTables thì tốt nếu dữ liệu của bạn "giống bảng".)

Chúc may mắn, với mọi giá!

+0

Joe câu trả lời của bạn luôn luôn đá. Tôi đã cố gắng tìm ra một thứ như thế này. – YXD

+0

Cảm ơn lời khuyên HDF. Trông giống như một sự thay đổi lớn nhưng nó có thể đáng giá, tôi sẽ kiểm tra nó. – user124114