2012-03-21 27 views
9

Tôi có một tệp thực thi mà tôi gọi bằng cách sử dụng subprocess.Popen. Sau đó, tôi dự định sẽ cung cấp cho nó một số dữ liệu thông qua stdin bằng cách sử dụng một luồng đọc giá trị của nó từ một Hàng đợi mà sau này sẽ được điền vào một luồng khác. Đầu ra phải được đọc bằng cách sử dụng ống stdout trong một luồng khác và được sắp xếp lại trong một Hàng đợi.python: đọc đầu ra của tiến trình con trong các chủ đề

Theo như tôi hiểu từ nghiên cứu trước đây của tôi, việc sử dụng các chuỗi với Hàng đợi là thực hành tốt.

Thực thi bên ngoài, thật không may, sẽ không nhanh chóng cho tôi câu trả lời cho mỗi dòng được nối vào, do đó các chu trình viết đơn giản, đọc không phải là một lựa chọn. Việc thực hiện thực hiện một số đa luồng nội bộ và tôi muốn đầu ra ngay sau khi nó trở nên có sẵn, do đó các chủ đề đọc bổ sung.

Như một ví dụ để kiểm tra thực thi sẽ chỉ xáo trộn mỗi dòng (shuffleline.py):

#!/usr/bin/python -u 
import sys 
from random import shuffle 

for line in sys.stdin: 
    line = line.strip() 

    # shuffle line 
    line = list(line) 
    shuffle(line) 
    line = "".join(line) 

    sys.stdout.write("%s\n"%(line)) 
    sys.stdout.flush() # avoid buffers 

Xin lưu ý rằng đây là đã như không có bộ đệm càng tốt. Hay phải không? Đây là tước xuống chương trình của tôi kiểm tra:

#!/usr/bin/python -u 
import sys 
import Queue 
import threading 
import subprocess 

class WriteThread(threading.Thread): 
    def __init__(self, p_in, source_queue): 
     threading.Thread.__init__(self) 
     self.pipe = p_in 
     self.source_queue = source_queue 

    def run(self): 
     while True: 
      source = self.source_queue.get() 
      print "writing to process: ", repr(source) 
      self.pipe.write(source) 
      self.pipe.flush() 
      self.source_queue.task_done() 

class ReadThread(threading.Thread): 
    def __init__(self, p_out, target_queue): 
     threading.Thread.__init__(self) 
     self.pipe = p_out 
     self.target_queue = target_queue 

    def run(self): 
     while True: 
      line = self.pipe.readline() # blocking read 
      if line == '': 
       break 
      print "reader read: ", line.rstrip() 
      self.target_queue.put(line) 

if __name__ == "__main__": 

    cmd = ["python", "-u", "./shuffleline.py"] # unbuffered 
    proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE) 

    source_queue = Queue.Queue() 
    target_queue = Queue.Queue() 

    writer = WriteThread(proc.stdin, source_queue) 
    writer.setDaemon(True) 
    writer.start() 

    reader = ReadThread(proc.stdout, target_queue) 
    reader.setDaemon(True) 
    reader.start() 

    # populate queue 
    for i in range(10): 
     source_queue.put("string %s\n" %i) 
    source_queue.put("") 

    print "source_queue empty: ", source_queue.empty() 
    print "target_queue empty: ", target_queue.empty() 

    import time 
    time.sleep(2) # expect some output from reader thread 

    source_queue.join() # wait until all items in source_queue are processed 
    proc.stdin.close() # should end the subprocess 
    proc.wait() 

này cung cấp cho các kết quả như sau (python2.7):

writing to process: 'string 0\n' 
writing to process: 'string 1\n' 
writing to process: 'string 2\n' 
writing to process: 'string 3\n' 
writing to process: 'string 4\n' 
writing to process: 'string 5\n' 
writing to process: 'string 6\n' 
source_queue empty: writing to process: 'string 7\n' 
writing to process: 'string 8\n' 
writing to process: 'string 9\n' 
writing to process: '' 
True 
target_queue empty: True 

sau đó không có gì trong 2 giây ...

reader read: rgsn0i t 
reader read: nrg1sti 
reader read: tis n2rg 
reader read: snt gri3 
reader read: nsri4 tg 
reader read: stir5 gn 
reader read: gnri6ts 
reader read: ngrits7 
reader read: 8nsrt ig 
reader read: sg9 nitr 

Các interleaving lúc đầu dự kiến. Tuy nhiên, đầu ra của quy trình con không xuất hiện cho đến khi sau khi tiến trình con kết thúc. Với nhiều đường ống trong tôi nhận được một số đầu ra, do đó tôi giả định một vấn đề bộ nhớ đệm trong ống stdout. Theo các câu hỏi khác được đăng ở đây, stdout flushing (trong subprocess) sẽ hoạt động, ít nhất là trên Linux.

Trả lời

7

Vấn đề của bạn không có gì để làm mô-đun subprocess, hoặc chủ đề (có vấn đề như chúng), hoặc thậm chí trộn các quy trình con và chủ đề (ý tưởng xấu), thậm chí tệ hơn việc sử dụng các chủ đề để bắt đầu, trừ khi bạn bằng cách sử dụng backport của mô-đun con của Python 3.2 mà bạn có thể lấy từ code.google.com/p/python-subprocess32) hoặc truy cập vào cùng một thứ từ nhiều luồng (như các câu lệnh print của bạn).

Điều gì xảy ra là bộ đệm chương trình shuffleline.py của bạn. Không phải trong đầu ra, nhưng trong đầu vào. Mặc dù nó không phải là rất rõ ràng, khi bạn lặp qua một fileobject, Python sẽ đọc trong khối, thường là 8k byte.Kể từ sys.stdin là một fileobject, for vòng lặp của bạn sẽ đệm cho đến khi EOF hoặc một khối đầy đủ:

for line in sys.stdin: 
    line = line.strip() 
    .... 

Nếu bạn muốn không làm điều này, hoặc là sử dụng một vòng lặp while để gọi sys.stdin.readline() (mà trả '' cho EOF):

while True: 
    line = sys.stdin.readline() 
    if not line: 
     break 
    line = line.strip() 
    ... 

hoặc sử dụng các hình thức hai đối số của iter(), mà tạo ra một iterator mà các cuộc gọi đối số đầu tiên cho đến khi đối số thứ hai ("trọng điểm") được trả về:

for line in iter(sys.stdin.readline, ''): 
    line = line.strip() 
    ... 

Tôi cũng sẽ hồi tưởng nếu tôi không đề xuất không sử dụng đề tài cho điều này, nhưng không chặn I/O trên đường ống của tiến trình thay thế, hoặc thậm chí một cái gì đó giống như twisted.reactor.spawnProcess. như người tiêu dùng và nhà sản xuất.

+0

Cảm ơn, đó là giải pháp! – muckl

+1

Tôi có thể hỏi tại sao hỗn hợp của tiến trình con và luồng là một cách tiếp cận khủng khiếp như vậy? Nó có vẻ thanh lịch hơn so với gọi I/O không bị chặn nhiều lần trong khi không có gì xảy ra. Rõ ràng các chủ đề không nên truy cập vào bất kỳ cơ sở dữ liệu không phải luồng an toàn nào nhưng chỉ đọc và viết từ hoặc đến một Hàng đợi có vẻ an toàn. Các thay đổi trong backport Python3.2 có quan trọng đối với trường hợp đơn giản như tôi không? – muckl

+3

Vấn đề với chủ đề và quy trình phụ cụ thể là vấn đề trộn chủ đề và ngã ba. Xem http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them và các bài viết khác. Phương thức backprocess của Python 3.2 hoạt động xung quanh các vấn đề đó. Đối với các chủ đề nói chung, vấn đề chính là chúng khó kiểm soát và gỡ lỗi. Ví dụ, bạn không thể giết chúng từ "bên ngoài" các chủ đề, vì vậy nếu một sợi bị mắc kẹt trong một đọc hoặc viết không có gì bạn có thể làm gì về nó. –

Các vấn đề liên quan