2014-10-27 18 views
7

Tôi có hàng trăm nghìn tệp văn bản mà tôi muốn phân tích theo nhiều cách khác nhau. Tôi muốn lưu đầu ra vào một tập tin duy nhất mà không có vấn đề đồng bộ hóa. Tôi đã sử dụng hồ bơi đa xử lý để làm điều này để tiết kiệm thời gian, nhưng tôi không thể tìm ra cách kết hợp Pool và Queue.Python: Viết vào một tệp có hàng đợi trong khi sử dụng đa xử lý Pool

Mã sau sẽ lưu tên vô cùng cũng như số "x" liên tiếp tối đa trong tệp. Tuy nhiên, tôi muốn tất cả các quy trình lưu kết quả vào cùng một tệp và không lưu vào các tệp khác nhau như trong ví dụ của tôi. Bất kì sự giúp đỡ nào trong việc này đều rất được trân trọng.

import multiprocessing 

with open('infilenamess.txt') as f: 
    filenames = f.read().splitlines() 

def mp_worker(filename): 
with open(filename, 'r') as f: 
     text=f.read() 
     m=re.findall("x+", text) 
     count=len(max(m, key=len)) 
     outfile=open(filename+'_results.txt', 'a') 
     outfile.write(str(filename)+'|'+str(count)+'\n') 
     outfile.close() 

def mp_handler(): 
    p = multiprocessing.Pool(32) 
    p.map(mp_worker, filenames) 

if __name__ == '__main__': 
    mp_handler() 

Trả lời

21

Bể đa xử lý sẽ thực hiện hàng đợi cho bạn. Chỉ cần sử dụng phương thức pool trả về giá trị trả về cho người gọi. imap hoạt động tốt:

import multiprocessing 
import re 

def mp_worker(filename): 
    with open(filename) as f: 
     text = f.read() 
    m = re.findall("x+", text) 
    count = len(max(m, key=len)) 
    return filename, count 

def mp_handler(): 
    p = multiprocessing.Pool(32) 
    with open('infilenamess.txt') as f: 
     filenames = [line for line in (l.strip() for l in f) if line] 
    with open('results.txt', 'w') as f: 
     for result in p.imap(mp_worker, filenames): 
      # (filename, count) tuples from worker 
      f.write('%s: %d\n' % result) 

if __name__=='__main__': 
    mp_handler() 
+0

Vì vậy, tôi lặp lại từng kết quả và ghi chúng vào tệp khi chúng đến? Điều đó có nghĩa là nhân viên mới sẽ không bắt đầu cho đến khi mỗi "kết quả" đã được viết, hoặc sẽ chạy 32 lần, nhưng sẽ đợi để viết? Ngoài ra, bạn có thể giải thích tại sao bạn thay thế f.read(). Splitlines() bằng [line for line in (l.strip() cho l trong f) nếu dòng]? – risraelsen

+0

32 quy trình chạy trong nền và nhận thêm tên tệp trong "khối" khi chúng chuyển kết quả về quy trình gốc. Kết quả được truyền lại ngay lập tức để cha mẹ đang làm công việc của nó song song. Một chút hiệu quả của nó để đọc các dòng tập tin theo dòng hơn là đọc toàn bộ điều và chia nó sau này ... đó là những gì danh sách cho. – tdelaney

+2

câu trả lời hay/ví dụ – zach

3

Tôi đã trả lời chấp nhận và đơn giản hóa nó cho sự hiểu biết của tôi về cách làm việc này. Tôi đăng nó ở đây trong trường hợp nó giúp người khác.

import multiprocessing 

def mp_worker(number): 
    number += 1 
    return number 

def mp_handler(): 
    p = multiprocessing.Pool(32) 
    numbers = list(range(1000)) 
    with open('results.txt', 'w') as f: 
     for result in p.imap(mp_worker, numbers): 
      f.write('%d\n' % result) 

if __name__=='__main__': 
    mp_handler() 
Các vấn đề liên quan