2017-08-26 50 views
25

Tôi có một đối tượng Connection được sử dụng để chứa đọc và viết dòng asyncio kết nối:Cách chính xác để thu được từ luồng là gì?

class Connection(object): 

    def __init__(self, stream_in, stream_out): 
     object.__init__(self) 

     self.__in = stream_in 
     self.__out = stream_out 

    def read(self, n_bytes : int = -1): 
     return self.__in.read(n_bytes) 

    def write(self, bytes_ : bytes): 
     self.__out.write(bytes_) 
     yield from self.__out.drain() 

Về phía server, connected tạo ra một đối tượng Connection mỗi khi một client kết nối, sau đó đọc 4 byte.

@asyncio.coroutine 
def new_conection(stream_in, stream_out): 
    conn = Connection(stream_in, stream_out) 
    data = yield from conn.read(4) 
    print(data) 

Và ở phía máy khách, 4 byte được viết ra.

@asyncio.coroutine 
def client(loop): 
    ... 
    conn = Connection(stream_in, stream_out) 
    yield from conn.write(b'test') 

này làm việc gần như như mong đợi, nhưng tôi phải yield from mỗi readwrite gọi. Tôi đã thử yield from ing từ bên Connection:

def read(self, n_bytes : int = -1): 
    data = yield from self.__in.read(n_bytes) 
    return data 

Nhưng thay vì nhận được dữ liệu, tôi nhận được một kết quả như

<generator object StreamReader.read at 0x1109983b8> 

Nếu tôi gọi readwrite từ nhiều nơi, tôi không muốn lặp lại mỗi lần yield from; thay vì giữ chúng bên trong Connection. Mục tiêu cuối cùng của tôi là cắt giảm chức năng new_conection của tôi như thế này:

@asyncio.coroutine 
def new_conection(stream_in, stream_out): 
    conn = Connection(stream_in, stream_out) 
    print(conn.read(4)) 
+0

Tại sao bạn phải sinh lời? Nếu bạn không kiếm được từ conn.read (4), có vẻ như tôi chỉ đơn giản trả về một đối tượng byte. Đó là những gì bạn đang tìm kiếm ở đây? – RageCage

+0

@RageCage: Không có 'yield from'ing,' conn.read (4) 'vẫn trả về một trình tạo:' ' –

+0

Xin lỗi tôi nên làm rõ; nếu bạn không thu được từ lần lặp đầu tiên của conn.read() (phiên bản dòng đơn) kết quả là gì? – RageCage

Trả lời

6

StreamReader.read is a coroutine, lựa chọn duy nhất của bạn để gọi nó là một) gói nó trong một Task hoặc Future và chạy mà qua một vòng lặp sự kiện, b) await ing nó từ coroutine định nghĩa với async def, hoặc c) sử dụng yield from với nó từ coroutine được định nghĩa là một hàm được trang trí với @asyncio.coroutine.

Kể từ Connection.read được gọi từ một vòng lặp sự kiện (thông qua coroutine new_connection), bạn không thể tái sử dụng mà sự kiện vòng lặp để chạy một Task hoặc Future cho StreamReader.read: event loops can't be started while they're already running. Bạn có thể phải stop the event loop (tai hại và có thể không thể làm đúng) hoặc create a new event loop (lộn xộn và đánh bại mục đích sử dụng coroutines). Không ai trong số đó là mong muốn, do đó, Connection.read cần phải là một coroutine hoặc một chức năng async.

Hai tùy chọn còn lại (await trong một async def coroutine hoặc yield from trong một @asyncio.coroutine - chức năng được phân bổ) chủ yếu là tương đương. Sự khác biệt duy nhất là async def and await were added in Python 3.5, vì vậy đối với 3,4, sử dụng yield from@asyncio.coroutine là lựa chọn duy nhất (coroutines và asyncio không tồn tại trước 3.4, vì vậy các phiên bản khác không liên quan). Cá nhân, tôi thích sử dụng async defawait, vì việc xác định coroutines với async def sạch hơn và rõ ràng hơn với trang trí.

Tóm lại: có Connection.readnew_connection được coroutines (sử dụng một trong hai trang trí hoặc async từ khóa), và sử dụng await (hoặc yield from) khi gọi coroutines khác (await conn.read(4) trong new_connection, và await self.__in.read(n_bytes) trong Connection.read).

+1

Ah, câu trả lời rất hay Mego! Điều này được viết rõ ràng bởi một người biết những gì họ nói về. Tôi đã học được nhiều từ việc đọc nó. +1 –

1

Tôi tìm thấy một đoạn của StreamReader source code trên đường dây 620 thực sự là một ví dụ hoàn hảo của việc sử dụng của hàm.

Trong câu trả lời trước, tôi bỏ qua thực tế là self.__in.read(n_bytes) không chỉ là một coroutine (mà tôi nên biết là xem xét từ mô-đun XD asyncio) nhưng nó mang lại kết quả trên đường. Vì vậy, nó là trong thực tế, một máy phát điện, và bạn sẽ cần phải năng suất từ ​​nó.

Vay vòng lặp này từ mã nguồn, chức năng đọc của bạn sẽ giống như thế này:

def read(self, n_bytes : int = -1): 
    data = bytearray() #or whatever object you are looking for 
    while 1: 
     block = yield from self.__in.read(n_bytes) 
     if not block: 
      break 
     data += block 
    return data 

self.__in.read(n_bytes) là một máy phát điện, bạn phải tiếp tục mang lại từ nó cho đến khi nó mang lại một kết quả trống để báo hiệu kết thúc đọc. Bây giờ chức năng đọc của bạn sẽ trả về dữ liệu chứ không phải máy phát. Bạn sẽ không phải kiếm lợi nhuận từ phiên bản conn.read() này.

+0

Sử dụng chức năng chính xác như bạn đã cung cấp, tôi vẫn nhận được một đối tượng máy phát ('Connection.read'). –

+0

Bạn vẫn đang sử dụng tính năng gọi điện liên lạc? Thử in dữ liệu và gõ (dữ liệu) vào chức năng đọc để xem nó là gì trước khi trở về. – RageCage

+0

Không, tôi đã xóa nó và thử 'data = conn.read (4)' thay thế. Nó là một máy phát điện. –

Các vấn đề liên quan