2013-03-29 25 views
6

Điều này có vẻ như là một vấn đề đơn giản nhưng tôi không thể có được đầu của tôi xung quanh nó.Ghi dữ liệu vào tập tin hdf bằng cách sử dụng đa xử lý

Tôi có mô phỏng chạy trong vòng lặp đôi và ghi kết quả vào tệp HDF. Một phiên bản đơn giản của chương trình này được hiển thị dưới đây:

import tables as pt 

a = range(10) 
b = range(5) 

def Simulation(): 
    hdf = pt.openFile('simulation.h5',mode='w') 
    for ii in a: 
     print(ii) 
     hdf.createGroup('/','A%s'%ii) 
     for i in b: 
      hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i]) 
     hdf.close() 
    return 
Simulation() 

Mã này thực hiện chính xác những gì tôi muốn nhưng vì quá trình này có thể mất nhiều thời gian để chạy tôi cố gắng sử dụng các mô-đun đa và sử dụng đoạn mã sau:

import multiprocessing 
import tables as pt 

a = range(10) 
b = range(5) 

def Simulation(ii): 
    hdf = pt.openFile('simulation.h5',mode='w') 
    print(ii) 
     hdf.createGroup('/','A%s'%ii) 
     for i in b: 
      hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i]) 
     hdf.close() 
    return 

if __name__ == '__main__': 
    jobs = [] 
    for ii in a: 
     p = multiprocessing.Process(target=Simulation, args=(ii,)) 
     jobs.append(p)  
     p.start() 

Tuy nhiên, điều này chỉ in mô phỏng cuối cùng cho tệp HDF, bằng cách nào đó nó ghi đè tất cả các nhóm khác.

Trả lời

10

Mỗi khi bạn mở một tệp trong chế độ ghi (w), một tệp mới sẽ được tạo - vì vậy nội dung của tệp sẽ bị mất nếu nó đã tồn tại. Chỉ xử lý tệp cuối cùng mới có thể ghi thành công vào tệp. Ngay cả khi bạn thay đổi chế độ đó để thêm, bạn không nên cố gắng ghi vào cùng một tệp từ nhiều quy trình - đầu ra sẽ bị cắt xén nếu hai quá trình cố gắng viết cùng một lúc.

Thay vào đó, có tất cả các quá trình lao động đưa ra trong một hàng đợi, và có một quá trình chuyên dụng đơn (hoặc là một tiến trình con hoặc quá trình chính) xử lý đầu ra khỏi hàng đợi và ghi vào tập tin:


import multiprocessing as mp 
import tables as pt 


num_arrays = 100 
num_processes = mp.cpu_count() 
num_simulations = 1000 
sentinel = None 


def Simulation(inqueue, output): 
    for ii in iter(inqueue.get, sentinel): 
     output.put(('createGroup', ('/', 'A%s' % ii))) 
     for i in range(num_arrays): 
      output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i]))) 


def handle_output(output): 
    hdf = pt.openFile('simulation.h5', mode='w') 
    while True: 
     args = output.get() 
     if args: 
      method, args = args 
      getattr(hdf, method)(*args) 
     else: 
      break 
    hdf.close() 

if __name__ == '__main__': 
    output = mp.Queue() 
    inqueue = mp.Queue() 
    jobs = [] 
    proc = mp.Process(target=handle_output, args=(output,)) 
    proc.start() 
    for i in range(num_processes): 
     p = mp.Process(target=Simulation, args=(inqueue, output)) 
     jobs.append(p) 
     p.start() 
    for i in range(num_simulations): 
     inqueue.put(i) 
    for i in range(num_processes): 
     # Send the sentinal to tell Simulation to end 
     inqueue.put(sentinel) 
    for p in jobs: 
     p.join() 
    output.put(None) 
    proc.join() 

Để so sánh, đây là một phiên bản trong đó sử dụng mp.Pool:

import multiprocessing as mp 
import tables as pt 


num_arrays = 100 
num_processes = mp.cpu_count() 
num_simulations = 1000 


def Simulation(ii): 
    result = [] 
    result.append(('createGroup', ('/', 'A%s' % ii))) 
    for i in range(num_arrays): 
     result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i]))) 
    return result 


def handle_output(result): 
    hdf = pt.openFile('simulation.h5', mode='a') 
    for args in result: 
     method, args = args 
     getattr(hdf, method)(*args) 
    hdf.close() 


if __name__ == '__main__': 
    # clear the file 
    hdf = pt.openFile('simulation.h5', mode='w') 
    hdf.close() 
    pool = mp.Pool(num_processes) 
    for i in range(num_simulations): 
     pool.apply_async(Simulation, (i,), callback=handle_output) 
    pool.close() 
    pool.join() 

Có vẻ đơn giản hơn phải không? Tuy nhiên có một sự khác biệt đáng kể. Mã ban đầu được sử dụng output.put để gửi args đến handle_output đang chạy trong tiến trình con của riêng nó. handle_output sẽ mất args từ hàng đợi output và xử lý chúng ngay lập tức. Với mã Hồ bơi ở trên, Simulation tích lũy toàn bộ một bó số args trong resultresult không được gửi đến handle_output cho đến khi sau Simulation trả lại.

Nếu Simulation mất nhiều thời gian, sẽ có thời gian chờ đợi lâu trong khi không có gì được ghi vào simulation.h5.

+0

Ngoài câu hỏi này tôi đã sử dụng mã ở trên với thành công nhưng bây giờ mở rộng mô phỏng này, Vòng lặp for được xác định bởi phạm vi = (1000) và vòng lặp for được xác định bởi phạm vi b = (100). Điều này dẫn đến kết quả là việc sử dụng rộng rãi trí nhớ của tôi. Tôi có 8 CPU với RAM 16 Gb nhưng khi tôi chạy các tập tin (thậm chí không có mô phỏng thực tế) sử dụng RAM của tôi đi đến 100% mà kết quả hệ thống của tôi để gian hàng. – user2143958

+0

Tôi nghĩ rằng chúng ta cần tách số lượng các quy trình con khỏi số nhiệm vụ. Có vẻ như bạn muốn 1000 nhiệm vụ, nhưng có lẽ không phải 1000 quy trình con. Tôi sẽ chỉnh sửa bài đăng để đề xuất cách bạn có thể làm điều đó. – unutbu

+0

Có bạn đúng, trong ví dụ trước cho các lần lặp lại lớn, một số lượng lớn các quy trình con đã được tạo ra làm tắc nghẽn toàn bộ bộ nhớ. Tệp bạn đã chỉnh sửa hoạt động hoàn hảo! Nhưng chỉ để làm rõ, tôi cũng đã thử nghiệm với các Pool() chức năng và chức năng này dường như làm việc khá tốt cũng mặc dù nó trở nên khó khăn hơn khi nhiều hơn một biến cần phải được thông qua. Lý do chính để chọn hàm Process() trong hàm Pool() là gì? – user2143958

Các vấn đề liên quan