Tôi không phải là chuyên gia về python nhưng tôi đã quản lý để viết ra một mã đa xử lý có sử dụng tất cả các lõi và lõi của tôi trong PC của tôi. Mã của tôi tải một mảng rất lớn, khoảng 1,6 GB và tôi cần cập nhật mảng trong mọi quá trình. May mắn thay, bản cập nhật bao gồm thêm một số ngôi sao nhân tạo vào hình ảnh và mọi quá trình đều có một bộ các vị trí hình ảnh khác nhau để thêm các sao nhân tạo.Đa xử lý Python và biến chia sẻ
Hình ảnh quá lớn và tôi không thể tạo hình ảnh mới mỗi lần gọi một quy trình. Giải pháp của tôi là tạo ra một biến trong bộ nhớ chia sẻ và tôi tiết kiệm rất nhiều bộ nhớ. Vì một số lý do, nó hoạt động cho 90% hình ảnh nhưng có những vùng được mã của tôi thêm số ngẫu nhiên vào một số vị trí mà tôi đã gửi trước đó cho các quy trình. Có liên quan đến cách tôi tạo biến được chia sẻ không? Các quy trình có gây trở ngại cho nhau trong quá trình thực hiện mã của tôi không?
Điều gì đó kỳ lạ là khi sử dụng một CPU và lõi đơn, hình ảnh hoàn hảo 100% và không có số ngẫu nhiên nào được thêm vào hình ảnh. Bạn có gợi ý cho tôi cách chia sẻ một mảng lớn giữa nhiều quy trình không? Đây là phần liên quan của mã của tôi. Vui lòng đọc dòng khi tôi xác định biến im_data.
import warnings
warnings.filterwarnings("ignore")
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import matplotlib.pyplot as plt
import sys,os
import subprocess
import numpy as np
import time
import cv2 as cv
import pyfits
from pyfits import getheader
import multiprocessing, Queue
import ctypes
class Worker(multiprocessing.Process):
def __init__(self, work_queue, result_queue):
# base class initialization
multiprocessing.Process.__init__(self)
# job management stuff
self.work_queue = work_queue
self.result_queue = result_queue
self.kill_received = False
def run(self):
while not self.kill_received:
# get a task
try:
i_range, psf_file = self.work_queue.get_nowait()
except Queue.Empty:
break
# the actual processing
print "Adding artificial stars - index range=", i_range
radius=16
x_c,y_c=((psf_size[1]-1)/2, (psf_size[2]-1)/2)
x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c)
distance = np.sqrt(x**2 + y**2)
for i in range(i_range[0],i_range[1]):
psf_xy=np.zeros(psf_size[1:3], dtype=float)
j=0
for i_order in range(psf_order+1):
j_order=0
while (i_order+j_order < psf_order+1):
psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order
j_order+=1
j+=1
psf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(psf_xy)
psf_xy *= psf_factor
npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4)
npsf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(npsf_xy)
npsf_xy *= npsf_factor
im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])]
im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])]
npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])]
npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])]
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10.
self.result_queue.put(id)
if __name__ == "__main__":
n_cpu=2
n_core=6
n_processes=n_cpu*n_core*1
input_mock_file=sys.argv[1]
print "Reading file ", im_file[i]
hdu=pyfits.open(im_file[i])
data=hdu[0].data
im_size=data.shape
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1])
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data
data=0
assert im_data.base.base is im_data_base.get_obj()
# run
# load up work queue
tic=time.time()
j_step=np.int(np.ceil(mock_n*1./n_processes))
j_range=range(0,mock_n,j_step)
j_range.append(mock_n)
work_queue = multiprocessing.Queue()
for j in range(np.size(j_range)-1):
if work_queue.full():
print "Oh no! Queue is full after only %d iterations" % j
work_queue.put((j_range[j:j+2], psf_file[i]))
# create a queue to pass to workers to store the results
result_queue = multiprocessing.Queue()
# spawn workers
for j in range(n_processes):
worker = Worker(work_queue, result_queue)
worker.start()
# collect the results off the queue
while not work_queue.empty():
result_queue.get()
print "Writing file ", mock_im_file[i]
hdu[0].data=im_data
hdu.writeto(mock_im_file[i])
print "%f s for parallel computation." % (time.time() - tic)
Thay vì chia sẻ mảng lớn, bạn không thể chia nó thành các mảng con nhỏ hơn và gửi các subarrays này đến các subprocesses? Và sau đó kết hợp kết quả trở lại mảng ban đầu. – freakish
Và cũng xem xét việc sử dụng một cái gì đó khác nhau sau đó Python để xử lý hình ảnh khổng lồ như vậy (C addon?). – freakish