Tôi đang gặp vấn đề sau trong python.Ghi vào một tập tin với đa xử lý
Tôi cần thực hiện một số phép tính song song với kết quả mà tôi cần phải được viết tuần tự trong một tệp. Vì vậy, tôi đã tạo ra một chức năng tiếp nhận một multiprocessing.Queue
và một tập tin xử lý, thực hiện các tính toán và in kết quả trong file:
import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation
# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file
def work(queue, fh):
while True:
try:
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
except:
break
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fh = open("foo", "w")
workQueue = Queue()
parList = # list of conditions for which I want to run doCalculation()
for x in parList:
workQueue.put(x)
processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
for p in processes:
p.start()
for p in processes:
p.join()
fh.close()
Nhưng những tập tin kết thúc trống sau khi kịch bản chạy. Tôi đã cố thay đổi hàm worker() thành:
def work(queue, filename):
while True:
try:
fh = open(filename, "a")
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
fh.close()
except:
break
và chuyển tên tệp làm tham số. Sau đó, nó hoạt động như tôi dự định. Khi tôi cố gắng làm điều tương tự tuần tự, mà không cần đa xử lý, nó cũng hoạt động bình thường.
Tại sao nó không hoạt động trong phiên bản đầu tiên? Tôi không thể nhìn thấy vấn đề.
Ngoài ra: tôi có thể đảm bảo rằng hai quy trình sẽ không cố gắng ghi tệp đồng thời không?
EDIT:
Cảm ơn. Tôi đã nhận nó ngay bây giờ. Đây là phiên bản đang hoạt động:
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform
def doCalculation(par):
t = uniform(0,2)
sleep(t)
return par * par # just to simulate some calculation
def feed(queue, parlist):
for par in parlist:
queue.put(par)
def calc(queueIn, queueOut):
while True:
try:
par = queueIn.get(block = False)
print "dealing with ", par, ""
res = doCalculation(par)
queueOut.put((par,res))
except:
break
def write(queue, fname):
fhandle = open(fname, "w")
while True:
try:
par, res = queue.get(block = False)
print >>fhandle, par, res
except:
break
fhandle.close()
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fname = "foo"
workerQueue = Queue()
writerQueue = Queue()
parlist = [1,2,3,4,5,6,7,8,9,10]
feedProc = Process(target = feed , args = (workerQueue, parlist))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
writProc = Process(target = write, args = (writerQueue, fname))
feedProc.start()
for p in calcProc:
p.start()
writProc.start()
feedProc.join()
for p in calcProc:
p.join()
writProc.join()
Vui lòng tập trung. Một bộ mã ** chỉ **. Vui lòng xóa mã lỗi thời hoặc không liên quan. Vui lòng tránh sử dụng "Chỉnh sửa". Chỉ cần nhận được câu hỏi để được hoàn toàn rõ ràng, đầy đủ và nhất quán, xin vui lòng. –