2017-11-18 35 views
7

Có một bộ kết hợp dòng Akka để thực hiện những điều sau (hoặc một cái gì đó có hiệu lực) không? (Hãy gọi nó là and cho bây giờ.)Làm thế nào để soạn hai luồng cạnh nhau?

(flow1: Flow[I, O, Mat]).and[O2](flow2: Flow[I, O2, Mat]): Flow[I, (O, O2), Mat] 

Ngữ nghĩa sẽ là bất cứ điều gì các nguồn, các yếu tố của nó sẽ được chuyển đến cả Flow s, và đầu ra của họ sẽ được kết hợp thành một mới Flow như một tuple. (Đối với những người quen thuộc với các mũi tên từ lý thuyết loại hương vị chức năng lập trình, tôi đang tìm kiếm một cái gì đó giống như &&&.)

Có hai combinators trong thư viện trông có liên quan, cụ thể là zipalsoTo. Nhưng người trước đây chấp nhận một số SourceShape và sau đó là số SinkShape. Không chấp nhận một số GraphShape. Tại sao điều này là trường hợp?

trường hợp sử dụng của tôi là một cái gì đó như sau:

someSource 
    .via(someFlowThatReturnsUnit.and(Flow.apply)) 
    .runWith(someSink) 

Không tìm thấy cái gì đó như .and, tôi sửa đổi ban đầu Flow của tôi như thế này:

someSource 
    .via(someFlowThatDoesWhateverItWasDoingEarlierButNowAlsoEmitsInputsAsIs) 
    .runWith(someSink) 

này hoạt động, nhưng tôi đang tìm kiếm một sạch hơn, giải pháp tổng hợp hơn.

+1

Một dòng chảy không phải là nghiêm chỉnh 1: 1 (1 đầu vào cho 1 đầu ra) do đó, một combinator chung như vậy sẽ khó khăn. (Bạn có thể sử dụng GraphDSL và sử dụng Broadcast + Merge) –

Trả lời

6

Thông báo

Như Viktor Klang lưu ý trong các ý kiến: nén vào một Tuple2[O,O2] chỉ khả thi khi nó được biết rằng cả hai dòng chảy, flow1 & flow2, là 1: 1 đối với yếu tố đầu vào đếm và đầu ra với đếm phần tử.

Graph Dựa Giải pháp

Một tuple xây dựng có thể được tạo ra bên trong một Graph. Trong thực tế, câu hỏi của bạn gần như hoàn hảo phù hợp với ví dụ giới thiệu:

enter image description here

Mở rộng mẫu mã trong các liên kết, bạn có thể sử dụng BroadcastZip

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 
    val in = Source(1 to 10) 
    val out = Sink.ignore 

    val bcast = builder.add(Broadcast[Int](2)) 

    val merge = builder.add(Zip[Int, Int]()) //different than link 

    val f1, f2, f4 = Flow[Int].map(_ + 10) 

    val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link 

    in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out 
       bcast ~> f4 ~> merge 
    ClosedShape 
})//end RunnableGraph.fromGraph 

Hơi Hacky Suối Giải pháp

Nếu bạn đang tìm kiếm một giải pháp luồng thuần túy, có thể sử dụng các luồng trung gian nhưng Mat sẽ không được duy trì và nó liên quan đến khai thực hiện các 2 dòng cho mỗi yếu tố đầu vào:

def andFlows[I, O, O2] (maxConcurrentSreams : Int) 
         (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed]) 
         (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] = 
    Flow[I].mapAsync(maxConcurrentStreams){ i => 

    val o : Future[O] = Source 
          .single(i) 
          .via(flow1) 
          .to(Sink.head[O]) 
          .run() 

    val o2 : Future[O2] = Source 
          .single(i) 
          .via(flow2) 
          .to(Sink.head[O2]) 
          .run() 

    o zip o2 
    }//end Flow[I].mapAsync 

Generic Nén

Nếu bạn muốn chắc nén chung, đối với hầu hết dòng chảy, sau đó loại ra này sẽ phải là (Seq[O], Seq[O2]).Đây là loại có thể được tạo ra bằng cách sử dụng Sink.seq thay vì Sink.head trong andFlows chức năng trên:

def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int) 
           (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed]) 
           (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] = 
    Flow[I].mapAsync(maxConcurrentStreams){ i => 

    val o : Future[Seq[O]] = Source 
           .single(i) 
           .via(flow1) 
           .to(Sink.seq[O]) 
           .run() 

    val o2 : Future[Seq[O2]] = Source 
           .single(i) 
           .via(flow2) 
           .to(Sink.seq[O2]) 
           .run() 

    o zip o2 
    }//end Flow[I].mapAsync 
Các vấn đề liên quan