2016-07-12 17 views
10

Tôi đã sử dụng thành công FileIO để truyền nội dung của tệp, tính toán một số phép biến đổi cho mỗi dòng và tổng hợp/giảm kết quả.Cách thích hợp để dừng luồng Akka theo điều kiện

Bây giờ tôi có một trường hợp sử dụng khá cụ thể, nơi tôi muốn dừng luồng khi có điều kiện, do đó không nhất thiết phải đọc toàn bộ tệp nhưng quá trình hoàn tất càng sớm càng tốt. Cách được khuyến nghị để đạt được điều này là gì?

+2

Nếu điều kiện dựa trên nội dung luồng, 'Source.takewhile' (http://doc.akka.io/api/akka/2.4.8/index.html#[email protected] (p: Out => Boolean): FlowOps.this.Repr [Out]) sẽ hoạt động. – devkat

Trả lời

16

Nếu điều kiện dừng là "ở bên ngoài của con suối"

Có một tòa nhà khối tiên tiến được gọi là KillSwitch mà bạn có thể sử dụng để làm điều này: http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html Các dòng sẽ được tắt khi chuyển đổi kill là được thông báo.

Nó có các phương pháp như abort(reason)/shutdown vv, xem tại đây cho nó là API: http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html

tài liệu tham khảo ở đây: http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala

sử dụng Ví dụ sẽ là:

val countingSrc = Source(Stream.from(1)).delay(1.second, 
    DelayOverflowStrategy.backpressure) 
val lastSnk = Sink.last[Int] 

val (killSwitch, last) = countingSrc 
    .viaMat(KillSwitches.single)(Keep.right) 
    .toMat(lastSnk)(Keep.both) 
    .run() 

doSomethingElse() 

killSwitch.shutdown() 

Await.result(last, 1.second) shouldBe 2 

Nếu điều kiện dừng nằm trong luồng

Bạn có thể sử dụng takeWhile để thể hiện bất kỳ điều kiện nào, mặc dù đôi khi take hoặc limit cũng có thể đủ "mất 10 lnes".

Nếu logic của bạn rất tiên tiến, bạn có thể xây dựng một giai đoạn đặc biệt để xử lý logic đặc biệt đó bằng cách sử dụng statefulMapConcat cho phép thể hiện bất kỳ thứ gì - để bạn có thể hoàn thành luồng bất cứ khi nào bạn muốn "từ bên trong".

+0

Điều gì sẽ xảy ra nếu tôi cần bật và tắt cho luồng liên tục để cuộc gọi chức năng giảm của tôi phát ra dữ liệu tổng hợp (được nhóm) vào Sink? Tôi có thể sử dụng takeWhile (p), và swithc p đúng và sai? – zt1983811

Các vấn đề liên quan