2016-05-10 16 views
8

Tôi đang cố gắng tích hợp luồng dòng akka vào ứng dụng Play 2.5 của mình. Ý tưởng là bạn có thể truyền trực tuyến trong một bức ảnh, sau đó ghi nó vào đĩa dưới dạng tệp thô, phiên bản hình thu nhỏ và phiên bản mờ.Làm thế nào để lắp ráp một dòng Akka chìm từ nhiều tập tin ghi?

tôi quản lý để làm việc này bằng cách sử dụng một cái gì đó đồ thị như thế này:

val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray}) 
            .map(_.result().toArray) 

def toByteArray = Flow[ByteString].map(b => b.toArray) 

val graph = Flow.fromGraph(GraphDSL.create() {implicit builder => 
    import GraphDSL.Implicits._ 
    val streamFan = builder.add(Broadcast[ByteString](3)) 
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) 
    val output = builder.add(Flow[ByteString].map(x => Success(Done))) 

    val rawFileSink = FileIO.toFile(file) 
    val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail)) 
    val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked)) 

    streamFan.out(0) ~> rawFileSink 
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in 
    streamFan.out(2) ~> output.in 

    byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink 
    byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink 

    FlowShape(streamFan.in, output.out) 
}) 

graph 

}

Sau đó, tôi dây nó để điều khiển trò chơi của tôi sử dụng một ắc như thế này:

val sink = Sink.head[Try[Done]] 

val photoStorageParser = BodyParser { req => 
    Accumulator(sink).through(graph).map(Right.apply) 
} 

Vấn đề là hai bồn chứa tệp đã xử lý của tôi không hoàn thành và tôi nhận được kích thước bằng không cho cả hai tệp được xử lý, nhưng không phải là tệp thô. Lý thuyết của tôi là bộ tích lũy chỉ chờ một trong các đầu ra của quạt của tôi, vì vậy khi luồng đầu vào hoàn thành và byteAccumulator của tôi phun ra toàn bộ tệp, vào lúc xử lý xong, đã có giá trị vật hoá từ đầu ra .

Vì vậy, câu hỏi của tôi là:
Tôi có đi đúng hướng với điều này theo cách tiếp cận của tôi không? Hành vi mong đợi để chạy biểu đồ như thế này là gì? Làm thế nào tôi có thể mang tất cả các bồn của mình lại với nhau để tạo thành một bồn rửa cuối cùng?

+0

Tôi cũng nghĩ rằng lý do là các luồng không được hợp nhất sau khi xử lý. Bạn đã thử 'Sink.combine' (http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-graphs.html#Combining_Sources_and_Sinks_with_simplified_API) chưa? – devkat

+0

Vâng, tôi đã cho Sink.combine đi, nhưng điều đó hợp nhất nhiều bồn rửa để gửi _to_ như một fan hâm mộ ra ngoài. Tôi nghĩ rằng tôi đang tìm kiếm một fan hâm mộ, nhưng có vẻ như bạn không thể làm điều đó với các nguồn chỉ chìm! – Tompey

+0

Đây có vẻ là một ví dụ tương tự: http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-quickstart.html#Broadcasting_a_stream. Có lẽ bạn phải trả về một 'SinkShape' thay vì một' FlowShape' để khai báo rằng luồng của bạn đã kết thúc? – devkat

Trả lời

7

Ok, sau một chút giúp đỡ (Andreas đã đi đúng hướng), tôi đã đến giải pháp này mà hiện các trick:

val rawFileSink = FileIO.toFile(file) 
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail)) 
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked)) 

val graph = Sink.fromGraph(GraphDSL.create(rawFileSink, thumbnailFileSink, watermarkedFileSink)((_, _, _)) { 
    implicit builder => (rawSink, thumbSink, waterSink) => { 
    val streamFan = builder.add(Broadcast[ByteString](2)) 
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) 

    streamFan.out(0) ~> rawSink 
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in 

    byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink 
    byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink 

    SinkShape(streamFan.in) 
    } 
}) 

graph.mapMaterializedValue[Future[Try[Done]]](fs => Future.sequence(Seq(fs._1, fs._2, fs._3)).map(f => Success(Done))) 

Sau đó nó chết từng gọi này từ chơi:

val photoStorageParser = BodyParser { req => 
    Accumulator(theSink).map(Right.apply) 
} 

def createImage(path: String) = Action(photoStorageParser) { req => 
    Created 
} 
+0

cảm ơn người đàn ông, tôi chỉ có một nhiệm vụ tương tự và không thể tìm ra cách chờ đợi cho tất cả các tương lai vật chất. Giải pháp của bạn đã giúp rất nhiều và nó hoạt động! –

+0

Xin chào! Làm thế nào về số lượng biến của bồn rửa cho sự kết hợp? – Alexander

Các vấn đề liên quan