2011-06-29 39 views
8

Tôi đang gặp vấn đề sau trong python.Ghi vào một tập tin với đa xử lý

Tôi cần thực hiện một số phép tính song song với kết quả mà tôi cần phải được viết tuần tự trong một tệp. Vì vậy, tôi đã tạo ra một chức năng tiếp nhận một multiprocessing.Queue và một tập tin xử lý, thực hiện các tính toán và in kết quả trong file:

import multiprocessing 
from multiprocessing import Process, Queue 
from mySimulation import doCalculation 

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file 

def work(queue, fh): 
while True: 
    try: 
     parameter = queue.get(block = False) 
     result = doCalculation(parameter) 
     print >>fh, string 
    except: 
     break 


if __name__ == "__main__": 
    nthreads = multiprocessing.cpu_count() 
    fh = open("foo", "w") 
    workQueue = Queue() 
    parList = # list of conditions for which I want to run doCalculation() 
    for x in parList: 
     workQueue.put(x) 
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)] 
    for p in processes: 
     p.start() 
    for p in processes: 
     p.join() 
    fh.close() 

Nhưng những tập tin kết thúc trống sau khi kịch bản chạy. Tôi đã cố thay đổi hàm worker() thành:

def work(queue, filename): 
while True: 
    try: 
     fh = open(filename, "a") 
     parameter = queue.get(block = False) 
     result = doCalculation(parameter) 
     print >>fh, string 
     fh.close() 
    except: 
     break 

và chuyển tên tệp làm tham số. Sau đó, nó hoạt động như tôi dự định. Khi tôi cố gắng làm điều tương tự tuần tự, mà không cần đa xử lý, nó cũng hoạt động bình thường.

Tại sao nó không hoạt động trong phiên bản đầu tiên? Tôi không thể nhìn thấy vấn đề.

Ngoài ra: tôi có thể đảm bảo rằng hai quy trình sẽ không cố gắng ghi tệp đồng thời không?


EDIT:

Cảm ơn. Tôi đã nhận nó ngay bây giờ. Đây là phiên bản đang hoạt động:

import multiprocessing 
from multiprocessing import Process, Queue 
from time import sleep 
from random import uniform 

def doCalculation(par): 
    t = uniform(0,2) 
    sleep(t) 
    return par * par # just to simulate some calculation 

def feed(queue, parlist): 
    for par in parlist: 
      queue.put(par) 

def calc(queueIn, queueOut): 
    while True: 
     try: 
      par = queueIn.get(block = False) 
      print "dealing with ", par, "" 
      res = doCalculation(par) 
      queueOut.put((par,res)) 
     except: 
      break 

def write(queue, fname): 
    fhandle = open(fname, "w") 
    while True: 
     try: 
      par, res = queue.get(block = False) 
      print >>fhandle, par, res 
     except: 
      break 
    fhandle.close() 

if __name__ == "__main__": 
    nthreads = multiprocessing.cpu_count() 
    fname = "foo" 
    workerQueue = Queue() 
    writerQueue = Queue() 
    parlist = [1,2,3,4,5,6,7,8,9,10] 
    feedProc = Process(target = feed , args = (workerQueue, parlist)) 
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)] 
    writProc = Process(target = write, args = (writerQueue, fname)) 


    feedProc.start() 
    for p in calcProc: 
     p.start() 
    writProc.start() 

    feedProc.join() 
    for p in calcProc: 
     p.join() 
    writProc.join() 
+2

Vui lòng tập trung. Một bộ mã ** chỉ **. Vui lòng xóa mã lỗi thời hoặc không liên quan. Vui lòng tránh sử dụng "Chỉnh sửa". Chỉ cần nhận được câu hỏi để được hoàn toàn rõ ràng, đầy đủ và nhất quán, xin vui lòng. –

Trả lời

16

Bạn thực sự nên sử dụng hai hàng đợi và ba loại xử lý riêng biệt.

  1. Đặt nội dung vào Hàng đợi # 1.

  2. Đưa nội dung ra khỏi Hàng đợi # 1 và thực hiện các phép tính, đưa nội dung vào Hàng đợi # 2. Bạn có thể có nhiều trong số này, kể từ khi họ nhận được từ một hàng đợi và đưa vào hàng đợi khác một cách an toàn.

  3. Đưa nội dung ra khỏi Hàng đợi # 2 và ghi nó vào một tệp. Bạn phải có chính xác 1 trong số này và không còn nữa. Nó "sở hữu" các tập tin, đảm bảo truy cập nguyên tử, và hoàn toàn đảm bảo rằng các tập tin được viết sạch và nhất quán.

+1

+1 cho hàng công nhân và người tiêu dùng. Hãy nhớ đặt maxsize trên hàng đợi hoặc nhân viên của bạn có thể ăn bộ nhớ của bạn và bỏ đói người viết. – Bittrance

+0

@ S.Lott @Bittrance hãy xem chỉnh sửa của tôi. –

+1

Oh nevermind về việc chạy nhiều ... Tôi ngu ngốc, đủ để không nhận thấy rằng tôi đã khởi chạy feedProc và writProc nhiều lần. ¬¬ Tôi đã sửa mã. Nhưng tôi vẫn có một tập tin trống. –

4

Nếu có ai đó đang tìm cách đơn giản để làm như vậy, điều này có thể giúp bạn. Tôi không nghĩ có bất kỳ nhược điểm nào khi thực hiện theo cách này. Nếu có, xin vui lòng cho tôi biết.

import multiprocessing 
import re 

def mp_worker(item): 
    # Do something 
    return item, count 

def mp_handler(): 
    cpus = multiprocessing.cpu_count() 
    p = multiprocessing.Pool(cpus) 
    # The below 2 lines populate the list. This listX will later be accessed parallely. This can be replaced as long as listX is passed on to the next step. 
    with open('ExampleFile.txt') as f: 
     listX = [line for line in (l.strip() for l in f) if line] 
    with open('results.txt', 'w') as f: 
     for result in p.imap(mp_worker, listX): 
      # (item, count) tuples from worker 
      f.write('%s: %d\n' % result) 

if __name__=='__main__': 
    mp_handler() 

Nguồn: Python: Writing to a single file with queue while using multiprocessing Pool

0

Có một lỗi trong mã ghi công nhân, nếu khối là sai, người lao động sẽ không bao giờ nhận được bất kỳ dữ liệu. Nên thực hiện như sau:

par, res = queue.get(block = True) 

Bạn có thể kiểm tra nó bằng cách thêm dòng

print "QSize",queueOut.qsize() 

sau queueOut.put((par,res))

Với khối = False bạn sẽ nhận được ngày càng tăng chiều dài của hàng đợi cho đến khi nó điền vào, không giống như với khối = True, nơi bạn nhận được luôn luôn "1".

Các vấn đề liên quan