2016-06-18 26 views
12

Tôi muốn đọc nhiều tệp lớn sử dụng luồng akka để xử lý từng dòng. Hãy tưởng tượng rằng mỗi khóa bao gồm một ("định danh" -> "giá trị"). Nếu tìm thấy "số nhận dạng" mới, tôi muốn lưu nó trong cơ sở dữ liệu và lưu "giá trị" của nó, nếu không, nếu định danh đã được tìm thấy trong khi xử lý luồng dòng, tôi chỉ muốn lưu "giá trị". Vì lý do đó, tôi nghĩ rằng tôi cần một loại lưu thông trạng thái đệ quy để giữ các định danh đã được tìm thấy trong một Bản đồ. Tôi nghĩ rằng tôi sẽ nhận được trong luồng này một cặp (newLine, contextWithIdentifiers).Luồng Akka. Trạng thái stateful trong một dòng chảy

Tôi vừa mới bắt đầu xem xét dòng suối Akka. Tôi đoán tôi có thể quản lý bản thân mình để làm công cụ xử lý không quốc tịch nhưng tôi không có đầu mối về cách giữ "contextWithIdentifiers". Tôi đánh giá cao nếu ai đó không thể chỉ cho tôi một hướng tốt.

Tôi đang sử dụng Scala.

+2

Tôi đánh giá cao việc bạn yêu cầu điều này. Đó là một yêu cầu đơn giản, nhưng việc tìm kiếm câu trả lời có ý nghĩa với mã mẫu có vẻ phức tạp. Đây là người duy nhất tôi tìm thấy! – akauppi

Trả lời

17

Có thể những thứ như statefulMapConcat có thể giúp bạn ở đó.

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 
import scala.util.Random._ 
import scala.math.abs 
import scala.concurrent.ExecutionContext.Implicits.global 

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

//encapsulating your input 
case class IdentValue(id: Int, value: String) 
//some random generated input 
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere")) 

val stateFlow = Flow[IdentValue].statefulMapConcat{() => 
    //state with already processed ids 
    var ids = Set.empty[Int] 
    identValue => if (ids.contains(identValue.id)) { 
    //save value to DB 
    println(identValue.value) 
    List(identValue) 
    } else { 
    //save both to database 
    println(identValue) 
    ids = ids + identValue.id 
    List(identValue) 
    } 
} 

Source(identValues) 
    .via(stateFlow) 
    .runWith(Sink.seq) 
    .onSuccess { case identValue => println(identValue) } 
+0

Cảm ơn bạn đã nhập mã. Tôi sẽ đánh giá cao hơn một chút các loại ở giữa, vì có một nhà máy() => ... tham gia. Bạn có biết tại sao không có phương thức '.statefulMap'? – akauppi

Các vấn đề liên quan