GỢI Ý: Câu trả lời này được dựa trên akka-stream-experimental
phiên bản 2.0-M2
. API có thể hơi khác trong các phiên bản khác.
Một cách dễ dàng để đóng kết nối bằng cách sử dụng một PushStage
:
import akka.stream.stage._
val closeClient = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]) = elem match {
case "goodbye" ⇒
// println("Connection closed")
ctx.finish()
case msg ⇒
ctx.push(msg)
}
}
Mỗi phần tử được nhận tại phía khách hàng hoặc tại phía máy chủ (và nói chung tất cả các yếu tố đó đi qua một Flow
) đi qua một thành phần Stage
như vậy. Ở Akka, sự trừu tượng đầy đủ được gọi là GraphStage
, bạn có thể tìm thêm thông tin trong số official documentation.
Với PushStage
chúng tôi có thể xem các phần tử đến cụ thể về giá trị của chúng và thay đổi ngữ cảnh cho phù hợp. Trong ví dụ trên, khi nhận được thông báo goodbye
, chúng tôi hoàn thành ngữ cảnh nếu không, chúng tôi chỉ chuyển tiếp giá trị thông qua phương thức push
.
Bây giờ, chúng ta có thể kết nối các thành phần closeClient
đến một dòng tùy ý thông qua phương pháp transform
:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ⇒ closeClient)
.map(_ ⇒ StdIn.readLine("> "))
.map(_ + "\n")
.map(ByteString(_))
connection.join(flow).run()
Dòng chảy trên nhận được một ByteString
và trả về một ByteString
, có nghĩa là nó có thể được kết nối với connection
qua join
phương pháp. Bên trong dòng chảy, trước tiên chúng tôi chuyển đổi các byte thành chuỗi trước khi chúng tôi gửi chúng đến closeClient
. Nếu PushStage
không kết thúc luồng, phần tử sẽ được chuyển tiếp trong luồng, tại đó nó được giảm xuống và được thay thế bằng một số đầu vào từ stdin, sau đó được gửi lại qua dây. Trong trường hợp luồng kết thúc, tất cả các bước xử lý luồng tiếp theo sau khi thành phần giai đoạn sẽ bị xóa - luồng hiện đã bị đóng.
Cảm ơn bạn rất nhiều!Có vẻ như thứ tôi đang tìm kiếm. – Tvaroh
Thật không may điều này không đóng kết nối websocket (trên akka-http) khi áp dụng cho dòng 'trong'. Có lẽ nó cũng cần nguồn 'out' là 'finish'ed. – Tvaroh
Có, akka-http đóng kết nối khi cả hai kết thúc: 'Flow' - by' finish'ing nó như được gợi ý và 'Source' (của đầu ra) - bằng cách dừng tác nhân bên dưới của nó. – Tvaroh