Như đã trình bày trong akka streams documentation tôi đã cố gắng để tạo ra một hồ bơi của người lao động (dòng chảy):Pool của người lao động với Akka Streams
def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = {
import GraphDSL.Implicits._
Flow.fromGraph(GraphDSL.create() { implicit b =>
val balancer = b.add(Balance[In](workerCount))
val merge = b.add(Merge[Out](workerCount))
for (_ <- 1 to workerCount) {
balancer ~> worker ~> merge
}
FlowShape(balancer.in, merge.out)
})
}
sau đó tôi đã sử dụng chức năng này để chạy một dòng chảy song song:
def main(args: Array[String]) {
val system = ActorSystem()
implicit val mat = ActorMaterializer.create(system)
val flow = Flow[Int].map(e => {
println(e)
Thread.sleep(1000) // 1 second
e
})
Source(Range.apply(1, 10).toList)
.via(balancer(flow, 3))
.runForeach(e => {})
}
Tôi nhận được kết quả mong đợi 1, 2, 3, 4, 5, 6, 7, 8, 9
nhưng các con số xuất hiện ở tốc độ 1 mỗi giây (không có tính song song). Tôi đang làm gì sai?
gì về bối cảnh thực hiện? Nếu bạn đang sử dụng một hồ bơi có kích thước cố định với một điều này là bình thường –
Điều đó có nghĩa là kích thước của bối cảnh mặc định là 1? Bạn có thể vui lòng nêu chi tiết cách ưa thích để cấu hình ngữ cảnh thực thi là gì? – Mihai238
Không bối cảnh mặc định nào không cố định, có thể bạn đang nhập ngữ cảnh ngầm toàn cục, điều này phụ thuộc vào quá nhiều thứ như phiên bản, bạn có thể thử 'implicit val ec = ExecutionContext.fromExecutor (Executors.newFixedThreadPool (10))' –