2016-03-25 15 views
5

Như đã trình bày trong akka streams documentation tôi đã cố gắng để tạo ra một hồ bơi của người lao động (dòng chảy):Pool của người lao động với Akka Streams

def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = { 
    import GraphDSL.Implicits._ 

    Flow.fromGraph(GraphDSL.create() { implicit b => 
     val balancer = b.add(Balance[In](workerCount)) 
     val merge = b.add(Merge[Out](workerCount)) 

     for (_ <- 1 to workerCount) { 
     balancer ~> worker ~> merge 
     } 
     FlowShape(balancer.in, merge.out) 
    }) 
    } 

sau đó tôi đã sử dụng chức năng này để chạy một dòng chảy song song:

def main(args: Array[String]) { 
    val system = ActorSystem() 
    implicit val mat = ActorMaterializer.create(system) 

    val flow = Flow[Int].map(e => { 
     println(e) 
     Thread.sleep(1000) // 1 second 
     e 
    }) 

    Source(Range.apply(1, 10).toList) 
     .via(balancer(flow, 3)) 
     .runForeach(e => {}) 
    } 

Tôi nhận được kết quả mong đợi 1, 2, 3, 4, 5, 6, 7, 8, 9 nhưng các con số xuất hiện ở tốc độ 1 mỗi giây (không có tính song song). Tôi đang làm gì sai?

+0

gì về bối cảnh thực hiện? Nếu bạn đang sử dụng một hồ bơi có kích thước cố định với một điều này là bình thường –

+0

Điều đó có nghĩa là kích thước của bối cảnh mặc định là 1? Bạn có thể vui lòng nêu chi tiết cách ưa thích để cấu hình ngữ cảnh thực thi là gì? – Mihai238

+0

Không bối cảnh mặc định nào không cố định, có thể bạn đang nhập ngữ cảnh ngầm toàn cục, điều này phụ thuộc vào quá nhiều thứ như phiên bản, bạn có thể thử 'implicit val ec = ExecutionContext.fromExecutor (Executors.newFixedThreadPool (10))' –

Trả lời

1

Như Endre Varga đã chỉ ra, luồng chính nó phải được đánh dấu bằng .async.

Nhưng ngay cả khi đó, hành vi không xác định bởi vì các giai đoạn không đồng bộ có kích thước bộ đệm mặc định là 16 và trình cân bằng có thể gửi tất cả thư đến cùng một nhân viên.

Kết quả là balancer ~> worker.async.addAttributes(Attributes.inputBuffer(1, 1)) ~> merge sẽ dẫn đến hành vi mong muốn.

Đối với một câu trả lời đưa ra bởi một thành viên dự án xem: https://github.com/akka/akka/issues/20146#issuecomment-201381356

3

Tài liệu trong phần đó đã lỗi thời và sẽ được khắc phục trong bản phát hành tiếp theo. Về cơ bản tất cả những gì bạn cần là gọi số .async trên chính luồng đó. Bằng cách này, bạn loại vẽ một "hộp" xung quanh dòng chảy (mà bạn có thể tưởng tượng như một hộp với một cổng đầu vào và đầu ra) mà sẽ ngăn chặn nung chảy trên hộp đó. Bằng cách làm điều này về cơ bản tất cả các công nhân sẽ được trên các diễn viên chuyên dụng. Phần còn lại của biểu đồ (các giai đoạn phát sóng và hợp nhất) sẽ chia sẻ một diễn viên khác (chúng sẽ không chạy trên các diễn viên riêng biệt, hộp async chỉ bảo vệ luồng, mọi thứ bên ngoài sẽ vẫn được hợp nhất).

+0

Tôi tin rằng đây là cách nó nên được, nhưng tôi cũng tin rằng rằng đây không phải là nó như thế nào. 'for (i <- 1 to workerCount) {balancer ~> worker.async ~> merge}' dường như không hoạt động. – lpiepiora

+2

Như tôi đã lưu ý trong vé của bạn (và sau khi chơi xung quanh, tôi đã nhầm lẫn), nó chỉ ra rằng kích thước bộ đệm mặc định cho các giai đoạn async là 16 và số dư kết thúc gửi tất cả các tin nhắn đến một giai đoạn duy nhất vì nó báo cáo là vẫn còn có bộ đệm không gian. Nếu bạn gửi nhiều tin nhắn hơn (như 100) hoặc đặt kích thước bộ đệm của giai đoạn công nhân thành 1, bạn sẽ thấy kết quả mong muốn. –

+0

Vâng, đúng, cũng thiết lập 'waitForAllDownstreams = true' có thể giúp một chút với nó. Tôi nghĩ rằng (tôi đã không thực sự kiểm tra nó), những gì xảy ra là như bạn đã nói các báo cáo hạ lưu đầu tiên, và tất cả các tin nhắn đang được gửi xuống nó.Với 'waitForAllDownstreams' nó đường nối phân phối là tốt hơn một chút – lpiepiora

Các vấn đề liên quan