2013-04-09 23 views
5

Tôi không phải là chuyên gia về python nhưng tôi đã quản lý để viết ra một mã đa xử lý có sử dụng tất cả các lõi và lõi của tôi trong PC của tôi. Mã của tôi tải một mảng rất lớn, khoảng 1,6 GB và tôi cần cập nhật mảng trong mọi quá trình. May mắn thay, bản cập nhật bao gồm thêm một số ngôi sao nhân tạo vào hình ảnh và mọi quá trình đều có một bộ các vị trí hình ảnh khác nhau để thêm các sao nhân tạo.Đa xử lý Python và biến chia sẻ

Hình ảnh quá lớn và tôi không thể tạo hình ảnh mới mỗi lần gọi một quy trình. Giải pháp của tôi là tạo ra một biến trong bộ nhớ chia sẻ và tôi tiết kiệm rất nhiều bộ nhớ. Vì một số lý do, nó hoạt động cho 90% hình ảnh nhưng có những vùng được mã của tôi thêm số ngẫu nhiên vào một số vị trí mà tôi đã gửi trước đó cho các quy trình. Có liên quan đến cách tôi tạo biến được chia sẻ không? Các quy trình có gây trở ngại cho nhau trong quá trình thực hiện mã của tôi không?

Điều gì đó kỳ lạ là khi sử dụng một CPU và lõi đơn, hình ảnh hoàn hảo 100% và không có số ngẫu nhiên nào được thêm vào hình ảnh. Bạn có gợi ý cho tôi cách chia sẻ một mảng lớn giữa nhiều quy trình không? Đây là phần liên quan của mã của tôi. Vui lòng đọc dòng khi tôi xác định biến im_data.

import warnings 
warnings.filterwarnings("ignore") 

from mpl_toolkits.mplot3d import Axes3D 
from matplotlib import cm 
import matplotlib.pyplot as plt 
import sys,os 
import subprocess 
import numpy as np 
import time 
import cv2 as cv 
import pyfits 
from pyfits import getheader 
import multiprocessing, Queue 
import ctypes 

class Worker(multiprocessing.Process): 


def __init__(self, work_queue, result_queue): 

    # base class initialization 
    multiprocessing.Process.__init__(self) 

    # job management stuff 
    self.work_queue = work_queue 
    self.result_queue = result_queue 
    self.kill_received = False 

def run(self): 
    while not self.kill_received: 

     # get a task 
     try: 
      i_range, psf_file = self.work_queue.get_nowait() 
     except Queue.Empty: 
      break 

     # the actual processing 
     print "Adding artificial stars - index range=", i_range 

     radius=16 
     x_c,y_c=((psf_size[1]-1)/2, (psf_size[2]-1)/2) 
     x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c) 
     distance = np.sqrt(x**2 + y**2) 

     for i in range(i_range[0],i_range[1]): 
      psf_xy=np.zeros(psf_size[1:3], dtype=float) 
      j=0 
      for i_order in range(psf_order+1): 
       j_order=0 
       while (i_order+j_order < psf_order+1): 
        psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order 
        j_order+=1 
        j+=1 


      psf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(psf_xy) 
      psf_xy *= psf_factor 

      npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4) 
      npsf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(npsf_xy) 
      npsf_xy *= npsf_factor 

      im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])] 
      im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])] 
      npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])] 
      npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])] 

      im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10. 


     self.result_queue.put(id) 

if __name__ == "__main__": 

    n_cpu=2 
    n_core=6 
    n_processes=n_cpu*n_core*1 
    input_mock_file=sys.argv[1] 

    print "Reading file ", im_file[i] 
    hdu=pyfits.open(im_file[i]) 
    data=hdu[0].data 
    im_size=data.shape 

    im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1]) 
    im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
    im_data = im_data.reshape(im_size[0], im_size[1]) 
    im_data[:] = data 
    data=0 
    assert im_data.base.base is im_data_base.get_obj() 

    # run 
    # load up work queue 
    tic=time.time() 
    j_step=np.int(np.ceil(mock_n*1./n_processes)) 
    j_range=range(0,mock_n,j_step) 
    j_range.append(mock_n) 


    work_queue = multiprocessing.Queue() 
    for j in range(np.size(j_range)-1): 
    if work_queue.full(): 
     print "Oh no! Queue is full after only %d iterations" % j 
    work_queue.put((j_range[j:j+2], psf_file[i])) 

    # create a queue to pass to workers to store the results 
    result_queue = multiprocessing.Queue() 

    # spawn workers 
    for j in range(n_processes): 
    worker = Worker(work_queue, result_queue) 
    worker.start() 

    # collect the results off the queue 
    while not work_queue.empty(): 
    result_queue.get() 

    print "Writing file ", mock_im_file[i] 
    hdu[0].data=im_data 
    hdu.writeto(mock_im_file[i]) 
    print "%f s for parallel computation." % (time.time() - tic) 
+1

Thay vì chia sẻ mảng lớn, bạn không thể chia nó thành các mảng con nhỏ hơn và gửi các subarrays này đến các subprocesses? Và sau đó kết hợp kết quả trở lại mảng ban đầu. – freakish

+0

Và cũng xem xét việc sử dụng một cái gì đó khác nhau sau đó Python để xử lý hình ảnh khổng lồ như vậy (C addon?). – freakish

Trả lời

3

Tôi nghĩ vấn đề (như bạn đề nghị nó trong câu hỏi của bạn) xuất phát từ thực tế là bạn đang viết trong mảng tương tự từ nhiều luồng.

im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1]) 
im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
im_data = im_data.reshape(im_size[0], im_size[1]) 
im_data[:] = data 

Mặc dù tôi khá chắc chắn rằng bạn có thể viết vào im_data_base một cách "quá trình an toàn" (một khóa ngầm được sử dụng bởi python để đồng bộ hóa quyền truy cập vào mảng), Tôi không chắc chắn bạn có thể viết thành im_data theo cách xử lý an toàn.

tôi sẽ do đó (mặc dù tôi không chắc chắn tôi sẽ giải quyết vấn đề của bạn) khuyên bạn nên tạo một khóa rõ ràng xung quanh im_data

# Disable python implicit lock, we are going to use our own 
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1], 
    lock=False) 
im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
im_data = im_data.reshape(im_size[0], im_size[1]) 
im_data[:] = data 
# Create our own lock 
im_data_lock = Lock() 

Sau đó, trong quá trình, có được các khóa mỗi khi bạn cần để sửa đổi im_data

self.im_data_lock.acquire() 
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10 
self.im_data_lock.release() 

tôi bỏ qua các mã để vượt qua khóa để các contructor của quá trình của bạn và lưu nó như một trường thành viên (self.im_data_lock) vì lợi ích của ngắn gọn. Bạn cũng nên chuyển mảng im_data cho hàm tạo của quy trình của bạn và lưu nó làm trường thành viên.

1

Sự cố xảy ra trong ví dụ của bạn khi nhiều luồng ghi vào các vùng chồng chéo trong hình ảnh/mảng. Vì vậy, thực sự bạn hoặc phải đặt một khóa cho mỗi hình ảnh hoặc tạo ra một bộ khóa cho mỗi phần hình ảnh (để giảm ganh đua).

Hoặc bạn có thể tạo các sửa đổi hình ảnh trong một bộ quy trình và thực hiện sửa đổi thực tế của hình ảnh trong một chuỗi riêng biệt.

Các vấn đề liên quan