Mỗi khi bạn mở một tệp trong chế độ ghi (w
), một tệp mới sẽ được tạo - vì vậy nội dung của tệp sẽ bị mất nếu nó đã tồn tại. Chỉ xử lý tệp cuối cùng mới có thể ghi thành công vào tệp. Ngay cả khi bạn thay đổi chế độ đó để thêm, bạn không nên cố gắng ghi vào cùng một tệp từ nhiều quy trình - đầu ra sẽ bị cắt xén nếu hai quá trình cố gắng viết cùng một lúc.
Thay vào đó, có tất cả các quá trình lao động đưa ra trong một hàng đợi, và có một quá trình chuyên dụng đơn (hoặc là một tiến trình con hoặc quá trình chính) xử lý đầu ra khỏi hàng đợi và ghi vào tập tin:
import multiprocessing as mp
import tables as pt
num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
sentinel = None
def Simulation(inqueue, output):
for ii in iter(inqueue.get, sentinel):
output.put(('createGroup', ('/', 'A%s' % ii)))
for i in range(num_arrays):
output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
def handle_output(output):
hdf = pt.openFile('simulation.h5', mode='w')
while True:
args = output.get()
if args:
method, args = args
getattr(hdf, method)(*args)
else:
break
hdf.close()
if __name__ == '__main__':
output = mp.Queue()
inqueue = mp.Queue()
jobs = []
proc = mp.Process(target=handle_output, args=(output,))
proc.start()
for i in range(num_processes):
p = mp.Process(target=Simulation, args=(inqueue, output))
jobs.append(p)
p.start()
for i in range(num_simulations):
inqueue.put(i)
for i in range(num_processes):
# Send the sentinal to tell Simulation to end
inqueue.put(sentinel)
for p in jobs:
p.join()
output.put(None)
proc.join()
Để so sánh, đây là một phiên bản trong đó sử dụng mp.Pool
:
import multiprocessing as mp
import tables as pt
num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
def Simulation(ii):
result = []
result.append(('createGroup', ('/', 'A%s' % ii)))
for i in range(num_arrays):
result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
return result
def handle_output(result):
hdf = pt.openFile('simulation.h5', mode='a')
for args in result:
method, args = args
getattr(hdf, method)(*args)
hdf.close()
if __name__ == '__main__':
# clear the file
hdf = pt.openFile('simulation.h5', mode='w')
hdf.close()
pool = mp.Pool(num_processes)
for i in range(num_simulations):
pool.apply_async(Simulation, (i,), callback=handle_output)
pool.close()
pool.join()
Có vẻ đơn giản hơn phải không? Tuy nhiên có một sự khác biệt đáng kể. Mã ban đầu được sử dụng output.put
để gửi args đến handle_output
đang chạy trong tiến trình con của riêng nó. handle_output
sẽ mất args
từ hàng đợi output
và xử lý chúng ngay lập tức. Với mã Hồ bơi ở trên, Simulation
tích lũy toàn bộ một bó số args
trong result
và result
không được gửi đến handle_output
cho đến khi sau Simulation
trả lại.
Nếu Simulation
mất nhiều thời gian, sẽ có thời gian chờ đợi lâu trong khi không có gì được ghi vào simulation.h5
.
Ngoài câu hỏi này tôi đã sử dụng mã ở trên với thành công nhưng bây giờ mở rộng mô phỏng này, Vòng lặp for được xác định bởi phạm vi = (1000) và vòng lặp for được xác định bởi phạm vi b = (100). Điều này dẫn đến kết quả là việc sử dụng rộng rãi trí nhớ của tôi. Tôi có 8 CPU với RAM 16 Gb nhưng khi tôi chạy các tập tin (thậm chí không có mô phỏng thực tế) sử dụng RAM của tôi đi đến 100% mà kết quả hệ thống của tôi để gian hàng. – user2143958
Tôi nghĩ rằng chúng ta cần tách số lượng các quy trình con khỏi số nhiệm vụ. Có vẻ như bạn muốn 1000 nhiệm vụ, nhưng có lẽ không phải 1000 quy trình con. Tôi sẽ chỉnh sửa bài đăng để đề xuất cách bạn có thể làm điều đó. – unutbu
Có bạn đúng, trong ví dụ trước cho các lần lặp lại lớn, một số lượng lớn các quy trình con đã được tạo ra làm tắc nghẽn toàn bộ bộ nhớ. Tệp bạn đã chỉnh sửa hoạt động hoàn hảo! Nhưng chỉ để làm rõ, tôi cũng đã thử nghiệm với các Pool() chức năng và chức năng này dường như làm việc khá tốt cũng mặc dù nó trở nên khó khăn hơn khi nhiều hơn một biến cần phải được thông qua. Lý do chính để chọn hàm Process() trong hàm Pool() là gì? – user2143958