2017-11-01 16 views
9

Giả sử tôi có một tệp văn bản rất lớn bao gồm nhiều dòng mà tôi muốn đảo ngược. Và tôi không quan tâm đến lệnh cuối cùng. Tệp đầu vào chứa các ký hiệu Cyrillic. Tôi sử dụng multiprocessing để xử lý nhiều lõi.Tại sao multiprocessing.Lock() không khóa tài nguyên dùng chung trong Python?

tôi đã viết chương trình như:

# task.py 

import multiprocessing as mp 


POOL_NUMBER = 2 


lock_read = mp.Lock() 
lock_write = mp.Lock() 

fi = open('input.txt', 'r') 
fo = open('output.txt', 'w') 

def handle(line): 
    # In the future I want to do 
    # some more complicated operations over the line 
    return line.strip()[::-1] # Reversing 

def target(): 
    while True: 
     try: 
      with lock_read: 
       line = next(fi) 
     except StopIteration: 
      break 

     line = handle(line) 

     with lock_write: 
      print(line, file=fo) 

pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)] 
for p in pool: 
    p.start() 
for p in pool: 
    p.join() 

fi.close() 
fo.close() 

Chương trình này thất bại với lỗi:

Process Process-2: 
Process Process-1: 
Traceback (most recent call last): 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "task.py", line 22, in target 
    line = next(fi) 
    File "/usr/lib/python3.5/codecs.py", line 321, in decode 
    (result, consumed) = self._buffer_decode(data, self.errors, final) 
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte 
Traceback (most recent call last): 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "task.py", line 22, in target 
    line = next(fi) 
    File "/usr/lib/python3.5/codecs.py", line 321, in decode 
    (result, consumed) = self._buffer_decode(data, self.errors, final) 
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte 

Mặt khác, mọi thứ đều hoạt động tốt nếu tôi đặt POOL_NUMBER = 1. Nhưng nó không có ý nghĩa nếu tôi muốn đạt được tổng hiệu suất.

Tại sao lỗi đó xảy ra? Và làm thế nào tôi có thể sửa nó?

Tôi sử dụng Python 3.5.2.

tôi tạo ra dữ liệu sử dụng kịch bản này:

# gen_file.py 

from random import randint 


LENGTH = 100 
SIZE = 100000 


def gen_word(length): 
    return ''.join(
     chr(randint(ord('а'), ord('я'))) 
     for _ in range(length) 
    ) 


if __name__ == "__main__": 
    with open('input.txt', 'w') as f: 
     for _ in range(SIZE): 
      print(gen_word(LENGTH), file=f) 
+0

Bạn có thể tham khảo một câu trả lời về [Chế biến tập tin duy nhất từ ​​nhiều quy trình trong python] (https : //stackoverflow.com/a/11196615/4662041) – Sheshnath

+0

bạn đã thử đọc tệp đó và in dữ liệu của nó chưa? nếu bạn gặp lại lỗi đó! điều đó có nghĩa là bạn nên đọc nó, dưới dạng chế độ nhị phân với "rb" ... – DRPK

+0

@DRPK tôi đã làm. Nếu tôi thả 'line = handle (line)' khỏi kịch bản của tôi, lỗi tương tự sẽ xuất hiện. – Fomalhaut

Trả lời

3

Vấn đề ở đây là đọc một tập tin từ các quá trình đa không hoạt động như bạn nghĩ, bạn không thể chia sẻ đối tượng open giữa các quá trình.

Bạn có thể tạo biến số toàn cầu current_line và mỗi lần đọc tệp và xử lý dòng hiện tại, không lý tưởng.

Đây là một cách tiếp cận khác nhau, sử dụng hồ bơi quy trình và phương pháp map, tôi lặp lại trong tập tin, và cho mỗi dòng tôi enqueue phương pháp mục tiêu của bạn:

from multiprocessing import Lock 
from multiprocessing import Pool 
import time 
import os 

POOL_NUMBER = 8 

def target(line): 
    # Really need some processing here 
    for _ in range(2**10): 
     pass 
    return line[::-1] 


pool = Pool(processes=POOL_NUMBER) 
os.truncate('output.txt', 0) # Just to make sure we have plan new file 
with open('input.txt', 'r') as fi: 
    t0 = time.time() 
    processed_lines = pool.map(target, fi.readlines()) 
    print('Total time', time.time() - t0) 

    with open('output.txt', 'w') as fo: 
     for processed_line in processed_lines: 
      fo.writelines(processed_line) 

Với 8 quy trình trên máy tính của tôi: Total time 1.3367934226989746

Và với 1 quá trình: Total time 4.324501991271973

này hoạt động tốt nhất nếu chức năng mục tiêu của bạn là CPU bị ràng buộc, một ap khác nhau proach sẽ chia tập tin thành các đoạn POOL_NUMBER và làm cho mỗi quá trình ghi một đoạn dữ liệu đã xử lý (với khóa!) vào tệp đầu ra.

Cách tiếp cận khác, là tạo quy trình tổng thể thực hiện công việc ghi cho phần còn lại của quy trình, here là một ví dụ.

EDIT

Sau khi bạn nhận xét tôi đã tìm bạn không thể phù hợp với những tập tin vào bộ nhớ. Đối với điều này, bạn chỉ có thể lặp qua đối tượng tệp sẽ đọc từng dòng vào bộ nhớ. Nhưng hơn chúng ta cần phải sửa đổi mã một lớn nhỏ:

POOL_NUMBER = 8 
CHUNK_SIZE = 50000 

def target(line): 
    # This is not a measurable task, since most of the time wil spent on writing the data 
    # if you have a CPU bound task, this code will make sense 
    return line[::-1] 


pool = Pool(processes=POOL_NUMBER) 
os.truncate('output.txt', 0) # Just to make sure we have plan new file 
processed_lines = [] 

with open('input.txt', 'r') as fi: 
    t0 = time.time() 
    for line in fi: 
     processed_lines.append(pool.apply_async(target, (line,))) # Keep a refernce to this task, but don't 

     if len(processed_lines) == CHUNK_SIZE: 
      with open('output.txt', 'w') as fo: # reading the file line by line 
       for processed_line in processed_lines: 
        fo.writelines(processed_line.get()) 
      processed_lines = [] # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory! 
    print('Total time', time.time() - t0) 

Hãy nhớ rằng bạn có thể chơi với CHUNK_SIZE biến để kiểm soát bạn sử dụng bao nhiêu bộ nhớ. Đối với tôi 5000 là khoảng 10K tối đa cho mỗi quá trình.

P.S

Tôi nghĩ tốt nhất là chia tệp lớn thành các tệp nhỏ hơn, cách này bạn giải quyết khóa đọc/ghi trên tệp và cũng có thể mở rộng quy trình (ngay cả trên máy khác!)

+0

Cảm ơn bạn đã cung cấp giải pháp. Nhưng tiếc là nó có một bất lợi lớn. Toàn bộ tệp đầu vào đi vào RAM (vì 'fi.readlines()' hoạt động theo cách này) và 'processed_lines' cũng mất nhiều bộ nhớ. Nói cách khác, kịch bản của bạn tiêu thụ quá nhiều bộ nhớ và nó sẽ không hiệu quả trong trường hợp thực sự rất lớn 'input.txt' (100 triệu dòng). Có thể nâng cấp tập lệnh của bạn để giải quyết vấn đề này không? – Fomalhaut

+0

@Fomalhaut Tôi đã cập nhật câu trả lời của mình, vui lòng thực hiện một vòng lặp :) –

0

Có vẻ như line = next(fi) không được xử lý chính xác theo khác nhau Process.

Có thể bỏ qua nhu cầu sử dụng next(fi) với sự trợ giúp của bộ đệm tạm thời của các dòng được lấp đầy bởi chuỗi chính của chương trình và được đọc bởi từng quy trình. Đối với vai trò này, tốt hơn nên sử dụng multiprocessing.Queue.

Vì vậy, đây là kịch bản của tôi:

from time import sleep, time 
import multiprocessing as mp 
import queue 


MAX_QUEUE_SIZE = 1000 
QUEUE_TIMEOUT = 0.000001 
POOL_NUMBER = 4 


def handle(line): 
    sleep(0.00001) # Some processing here that takes time 
    return line.strip()[::-1] 


def target(fout, write_lock, lines_queue): 
    while True: 
     try: 
      line = lines_queue.get(timeout=1.0) 
      line = handle(line) 
      with write_lock: 
       print(line, file=fout) 
       fout.flush() 
     except queue.Empty: 
      break 


if __name__ == "__main__": 
    time_begin = time() 

    with open('output.txt', 'w') as fout: 
     write_lock = mp.Lock() 
     lines_queue = mp.Queue() 

     processes = [ 
      mp.Process(target=target, args=(fout, write_lock, lines_queue)) 
      for _ in range(POOL_NUMBER) 
     ] 
     for p in processes: 
      p.start() 

     with open('input.txt', 'r') as fin: 
      while True: 
       try: 
        while lines_queue.qsize() < MAX_QUEUE_SIZE: 
         line = next(fin) 
         lines_queue.put(line) 
        sleep(QUEUE_TIMEOUT) 
       except StopIteration: 
        break 

     for p in processes: 
      p.join() 

    time_end = time() 
    print("Time:", time_end - time_begin) 

On CPU của tôi, tôi có kết quả này:

POOL_NUMBER = 1 -> Time: 17.877086400985718 
POOL_NUMBER = 2 -> Time: 8.611438989639282 
POOL_NUMBER = 3 -> Time: 6.332395553588867 
POOL_NUMBER = 4 -> Time: 5.321753978729248 
Các vấn đề liên quan