Tôi có một tệp thực thi mà tôi gọi bằng cách sử dụng subprocess.Popen. Sau đó, tôi dự định sẽ cung cấp cho nó một số dữ liệu thông qua stdin bằng cách sử dụng một luồng đọc giá trị của nó từ một Hàng đợi mà sau này sẽ được điền vào một luồng khác. Đầu ra phải được đọc bằng cách sử dụng ống stdout trong một luồng khác và được sắp xếp lại trong một Hàng đợi.python: đọc đầu ra của tiến trình con trong các chủ đề
Theo như tôi hiểu từ nghiên cứu trước đây của tôi, việc sử dụng các chuỗi với Hàng đợi là thực hành tốt.
Thực thi bên ngoài, thật không may, sẽ không nhanh chóng cho tôi câu trả lời cho mỗi dòng được nối vào, do đó các chu trình viết đơn giản, đọc không phải là một lựa chọn. Việc thực hiện thực hiện một số đa luồng nội bộ và tôi muốn đầu ra ngay sau khi nó trở nên có sẵn, do đó các chủ đề đọc bổ sung.
Như một ví dụ để kiểm tra thực thi sẽ chỉ xáo trộn mỗi dòng (shuffleline.py):
#!/usr/bin/python -u
import sys
from random import shuffle
for line in sys.stdin:
line = line.strip()
# shuffle line
line = list(line)
shuffle(line)
line = "".join(line)
sys.stdout.write("%s\n"%(line))
sys.stdout.flush() # avoid buffers
Xin lưu ý rằng đây là đã như không có bộ đệm càng tốt. Hay phải không? Đây là tước xuống chương trình của tôi kiểm tra:
#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess
class WriteThread(threading.Thread):
def __init__(self, p_in, source_queue):
threading.Thread.__init__(self)
self.pipe = p_in
self.source_queue = source_queue
def run(self):
while True:
source = self.source_queue.get()
print "writing to process: ", repr(source)
self.pipe.write(source)
self.pipe.flush()
self.source_queue.task_done()
class ReadThread(threading.Thread):
def __init__(self, p_out, target_queue):
threading.Thread.__init__(self)
self.pipe = p_out
self.target_queue = target_queue
def run(self):
while True:
line = self.pipe.readline() # blocking read
if line == '':
break
print "reader read: ", line.rstrip()
self.target_queue.put(line)
if __name__ == "__main__":
cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
source_queue = Queue.Queue()
target_queue = Queue.Queue()
writer = WriteThread(proc.stdin, source_queue)
writer.setDaemon(True)
writer.start()
reader = ReadThread(proc.stdout, target_queue)
reader.setDaemon(True)
reader.start()
# populate queue
for i in range(10):
source_queue.put("string %s\n" %i)
source_queue.put("")
print "source_queue empty: ", source_queue.empty()
print "target_queue empty: ", target_queue.empty()
import time
time.sleep(2) # expect some output from reader thread
source_queue.join() # wait until all items in source_queue are processed
proc.stdin.close() # should end the subprocess
proc.wait()
này cung cấp cho các kết quả như sau (python2.7):
writing to process: 'string 0\n'
writing to process: 'string 1\n'
writing to process: 'string 2\n'
writing to process: 'string 3\n'
writing to process: 'string 4\n'
writing to process: 'string 5\n'
writing to process: 'string 6\n'
source_queue empty: writing to process: 'string 7\n'
writing to process: 'string 8\n'
writing to process: 'string 9\n'
writing to process: ''
True
target_queue empty: True
sau đó không có gì trong 2 giây ...
reader read: rgsn0i t
reader read: nrg1sti
reader read: tis n2rg
reader read: snt gri3
reader read: nsri4 tg
reader read: stir5 gn
reader read: gnri6ts
reader read: ngrits7
reader read: 8nsrt ig
reader read: sg9 nitr
Các interleaving lúc đầu dự kiến. Tuy nhiên, đầu ra của quy trình con không xuất hiện cho đến khi sau khi tiến trình con kết thúc. Với nhiều đường ống trong tôi nhận được một số đầu ra, do đó tôi giả định một vấn đề bộ nhớ đệm trong ống stdout. Theo các câu hỏi khác được đăng ở đây, stdout flushing (trong subprocess) sẽ hoạt động, ít nhất là trên Linux.
Cảm ơn, đó là giải pháp! – muckl
Tôi có thể hỏi tại sao hỗn hợp của tiến trình con và luồng là một cách tiếp cận khủng khiếp như vậy? Nó có vẻ thanh lịch hơn so với gọi I/O không bị chặn nhiều lần trong khi không có gì xảy ra. Rõ ràng các chủ đề không nên truy cập vào bất kỳ cơ sở dữ liệu không phải luồng an toàn nào nhưng chỉ đọc và viết từ hoặc đến một Hàng đợi có vẻ an toàn. Các thay đổi trong backport Python3.2 có quan trọng đối với trường hợp đơn giản như tôi không? – muckl
Vấn đề với chủ đề và quy trình phụ cụ thể là vấn đề trộn chủ đề và ngã ba. Xem http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them và các bài viết khác. Phương thức backprocess của Python 3.2 hoạt động xung quanh các vấn đề đó. Đối với các chủ đề nói chung, vấn đề chính là chúng khó kiểm soát và gỡ lỗi. Ví dụ, bạn không thể giết chúng từ "bên ngoài" các chủ đề, vì vậy nếu một sợi bị mắc kẹt trong một đọc hoặc viết không có gì bạn có thể làm gì về nó. –