2014-05-12 31 views
9

Tôi đang chạy nhiều cat | zgrep lệnh trên một máy chủ từ xa và thu thập sản lượng của họ riêng để chế biến tiếp:Python: thực hiện mèo subprocess song song

class MainProcessor(mp.Process): 
    def __init__(self, peaks_array): 
     super(MainProcessor, self).__init__() 
     self.peaks_array = peaks_array 

    def run(self): 
     for peak_arr in self.peaks_array: 
      peak_processor = PeakProcessor(peak_arr) 
      peak_processor.start() 

class PeakProcessor(mp.Process): 
    def __init__(self, peak_arr): 
     super(PeakProcessor, self).__init__() 
     self.peak_arr = peak_arr 

    def run(self): 
     command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" ' 
     log_lines = (subprocess.check_output(command, shell=True)).split('\n') 
     process_data(log_lines) 

này, tuy nhiên, kết quả trong thực hiện tuần tự của các tiến trình con ('ssh ... lệnh ...). Đỉnh thứ hai chờ đầu tiên kết thúc và vân vân.

Làm cách nào để tôi có thể sửa đổi mã này để các cuộc gọi subprocess chạy song song, trong khi vẫn có thể thu thập đầu ra cho từng cá nhân?

+0

'--mmap' là vô ích khi đọc từ một đường ống ... – twalberg

Trả lời

-1

Một cách tiếp cận (chứ không phải là gợi ý khác đưa các quy trình shell trong nền) là sử dụng multithreading.

Các run phương pháp mà bạn có sau đó sẽ làm một cái gì đó như thế này:

thread.start_new_thread (myFuncThatDoesZGrep) 

Thu kết quả, bạn có thể làm một cái gì đó như thế này:

class MyThread(threading.Thread): 
    def run(self): 
     self.finished = False 
     # Your code to run the command here. 
     blahBlah() 
     # When finished.... 
     self.finished = True 
     self.results = [] 

Chạy chuỗi như đã nêu ở trên trong liên kết trên multithr eading. Khi đối tượng thread của bạn có myThread.finished == True, thì bạn có thể thu thập kết quả qua myThread.results.

+0

Với phương pháp này, làm thế nào tôi có thể nhận được đầu ra của mỗi khi các chủ đề kết thúc chạy? Và tôi đã sử dụng một quy trình, tại sao một thread lại hoạt động nhưng không phải là một tiến trình? – liarspocker

+0

Quá trình sẽ hoạt động - câu trả lời được nêu khác đề xuất rằng bạn thực hiện công việc đa tiến trình trong trình bao thực tế, bằng cách sử dụng &. Trong cách tiếp cận đó, bạn chỉ có một quy trình python, nhưng nó sinh ra nhiều quá trình shell. Trong phương pháp tiếp cận đa luồng, bạn có nhiều quy trình python, nhưng một quá trình shell cho mỗi quá trình python. Để thu thập các kết quả từ nhiều luồng, bạn sẽ tạo các lớp học mà phân lớp Thread. Sau đó, đặt kết quả từ một luồng làm dữ liệu đối tượng trong lớp đó. – FrobberOfBits

+0

Nhưng đó không phải là những gì mã trên đang làm? Tôi đang bắt đầu một quy trình mới cho mỗi đỉnh, sau đó chạy tiến trình con và process_data từ phương thức chạy của nó. – liarspocker

24

Bạn không cần không multiprocessing cũng không threading để chạy subprocesses song song ví dụ .:

#!/usr/bin/env python 
from subprocess import Popen 

# run commands in parallel 
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True) 
      for i in range(5)] 
# collect statuses 
exitcodes = [p.wait() for p in processes] 

nó chạy lệnh 5 vỏ cùng một lúc. Lưu ý: không phải chủ đề nào cũng không phải mô-đun multiprocessing được sử dụng tại đây. Không có điểm để thêm dấu và & vào các lệnh shell: Popen không chờ lệnh hoàn tất. Bạn cần gọi số .wait() một cách rõ ràng.

Đó là thuận lợi nhưng nó không phải là cần thiết để sử dụng chủ đề để thu thập kết quả từ subprocesses:

#!/usr/bin/env python 
from multiprocessing.dummy import Pool # thread pool 
from subprocess import Popen, PIPE, STDOUT 

# run commands in parallel 
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True, 
        stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) 
      for i in range(5)] 

# collect output in parallel 
def get_lines(process): 
    return process.communicate()[0].splitlines() 

outputs = Pool(len(processes)).map(get_lines, processes) 

liên quan: Python threading multiple bash subprocesses?.

Dưới đây là mã ví dụ mà được sản lượng từ vài subprocesses đồng thời trong cùng một thread:

#!/usr/bin/env python3 
import asyncio 
import sys 
from asyncio.subprocess import PIPE, STDOUT 

@asyncio.coroutine 
def get_lines(shell_command): 
    p = yield from asyncio.create_subprocess_shell(shell_command, 
      stdin=PIPE, stdout=PIPE, stderr=STDOUT) 
    return (yield from p.communicate())[0].splitlines() 

if sys.platform.startswith('win'): 
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows 
    asyncio.set_event_loop(loop) 
else: 
    loop = asyncio.get_event_loop() 

# get commands output in parallel 
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"' 
        .format(i=i, e=sys.executable)) for i in range(5)] 
print(loop.run_until_complete(asyncio.gather(*coros))) 
loop.close() 
+0

@ j-f-sebastian Umm ... Tôi bối rối về sự khác biệt giữa đoạn mã # 2 và # 3 trong câu trả lời của bạn là gì. Bạn có thể vui lòng chỉ ra một số tài nguyên hoặc giải thích những gì "được sản lượng ... ** trong cùng một thread **" có nghĩa là? BTW, cảm ơn rất nhiều cho # 2 :) –

+1

@SaheelGodhane: 'multiprocessing.dummy.Pool()' dựa trên giải pháp sử dụng * nhiều * (nhiều/nhiều hơn một) chủ đề. Giải pháp dựa trên 'asyncio' sử dụng một chuỗi * đơn * ở đây. Để hiểu cách thực hiện một số việc cùng một lúc * trong cùng một chuỗi *, hãy xem [Python Concurrency From the Ground Up: LIVE!] (Http://www.youtube.com/watch?v=MCs5OvhV9S4) – jfs

+0

Ví dụ tuyệt vời! Tôi đã cố gắng thực hiện các đoạn mã số 1 với hàm subprocess.run() mới nhưng có vẻ như nó sẽ không hoạt động vì hàm đó luôn chờ quá trình kết thúc. Tôi đã phải chuyển về sử dụng Popen để thay thế. – Jared

Các vấn đề liên quan