2015-12-15 20 views
6

Trong trường hợp của tôi, máy khách gửi thông báo websocket "tạm biệt" và tôi cần phải đóng kết nối được thiết lập trước đó ở phía máy chủ.Đóng kết nối websocket akka-http từ máy chủ

Từ AKKA-http docs:

kết nối Bế mạc có thể bằng cách hủy các luồng kết nối đến máy chủ từ logic của bạn (ví dụ như bằng cách kết nối hạ lưu của mình cho một Sink.cancelled và thượng nguồn của mình cho một Source.empty) . Cũng có thể tắt ổ cắm của máy chủ bằng cách hủy kết nối nguồn IncomingConnection.

Nhưng nó không phải là rõ ràng với tôi làm thế nào để làm điều đó có tính đến tài khoản đó SinkSource được thiết lập một lần khi đàm phán một kết nối mới:

(get & path("ws")) { 
    optionalHeaderValueByType[UpgradeToWebsocket]() { 
    case Some(upgrade) ⇒ 
     val connectionId = UUID() 
     complete(upgrade.handleMessagesWithSinkSource(sink, source)) 
    case None ⇒ 
     reject(ExpectedWebsocketRequestRejection) 
    } 
} 

Trả lời

3

GỢI Ý: Câu trả lời này được dựa trên akka-stream-experimental phiên bản 2.0-M2. API có thể hơi khác trong các phiên bản khác.


Một cách dễ dàng để đóng kết nối bằng cách sử dụng một PushStage:

import akka.stream.stage._ 

val closeClient = new PushStage[String, String] { 
    override def onPush(elem: String, ctx: Context[String]) = elem match { 
    case "goodbye" ⇒ 
     // println("Connection closed") 
     ctx.finish() 
    case msg ⇒ 
     ctx.push(msg) 
    } 
} 

Mỗi phần tử được nhận tại phía khách hàng hoặc tại phía máy chủ (và nói chung tất cả các yếu tố đó đi qua một Flow) đi qua một thành phần Stage như vậy. Ở Akka, sự trừu tượng đầy đủ được gọi là GraphStage, bạn có thể tìm thêm thông tin trong số official documentation.

Với PushStage chúng tôi có thể xem các phần tử đến cụ thể về giá trị của chúng và thay đổi ngữ cảnh cho phù hợp. Trong ví dụ trên, khi nhận được thông báo goodbye, chúng tôi hoàn thành ngữ cảnh nếu không, chúng tôi chỉ chuyển tiếp giá trị thông qua phương thức push.

Bây giờ, chúng ta có thể kết nối các thành phần closeClient đến một dòng tùy ý thông qua phương pháp transform:

val connection = Tcp().outgoingConnection(address, port) 

val flow = Flow[ByteString] 
    .via(Framing.delimiter(
     ByteString("\n"), 
     maximumFrameLength = 256, 
     allowTruncation = true)) 
    .map(_.utf8String) 
    .transform(() ⇒ closeClient) 
    .map(_ ⇒ StdIn.readLine("> ")) 
    .map(_ + "\n") 
    .map(ByteString(_)) 

connection.join(flow).run() 

Dòng chảy trên nhận được một ByteString và trả về một ByteString, có nghĩa là nó có thể được kết nối với connection qua join phương pháp. Bên trong dòng chảy, trước tiên chúng tôi chuyển đổi các byte thành chuỗi trước khi chúng tôi gửi chúng đến closeClient. Nếu PushStage không kết thúc luồng, phần tử sẽ được chuyển tiếp trong luồng, tại đó nó được giảm xuống và được thay thế bằng một số đầu vào từ stdin, sau đó được gửi lại qua dây. Trong trường hợp luồng kết thúc, tất cả các bước xử lý luồng tiếp theo sau khi thành phần giai đoạn sẽ bị xóa - luồng hiện đã bị đóng.

+0

Cảm ơn bạn rất nhiều!Có vẻ như thứ tôi đang tìm kiếm. – Tvaroh

+0

Thật không may điều này không đóng kết nối websocket (trên akka-http) khi áp dụng cho dòng 'trong'. Có lẽ nó cũng cần nguồn 'out' là 'finish'ed. – Tvaroh

+0

Có, akka-http đóng kết nối khi cả hai kết thúc: 'Flow' - by' finish'ing nó như được gợi ý và 'Source' (của đầu ra) - bằng cách dừng tác nhân bên dưới của nó. – Tvaroh

2

này có thể được thực hiện bằng cách sau đây trong hiện tại (2.4.14) phiên bản của AKKA dòng

package com.trackabus.misc 

import akka.stream.stage._ 
import akka.stream.{Attributes, FlowShape, Inlet, Outlet} 

// terminates the flow based on a predicate for a message of type T 
// if forwardTerminatingMessage is set the message is passed along the flow 
// before termination 
// if terminate is true the stage is failed, if it is false the stage is completed 
class TerminateFlowStage[T](
    pred: T => Boolean, 
    forwardTerminatingMessage: Boolean = false, 
    terminate: Boolean = true) 
    extends GraphStage[FlowShape[T, T]] 
{ 
    val in = Inlet[T]("TerminateFlowStage.in") 
    val out = Outlet[T]("TerminateFlowStage.out") 
    override val shape = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 

     setHandlers(in, out, new InHandler with OutHandler { 
     override def onPull(): Unit = { pull(in) } 

     override def onPush(): Unit = { 
      val chunk = grab(in) 

      if (pred(chunk)) { 
      if (forwardTerminatingMessage) 
       push(out, chunk) 
      if (terminate) 
       failStage(new RuntimeException("Flow terminated by TerminateFlowStage")) 
      else 
       completeStage() 
      } 
      else 
      push(out, chunk) 
     } 
     }) 
    } 
} 

Để sử dụng nó xác định giai đoạn của bạn

val termOnKillMe = new TerminateFlowStage[Message](_.isInstanceOf[KillMe]) 

và sau đó bao gồm nó như là một phần của luồng

.via(termOnKillMe) 
Các vấn đề liên quan