2016-02-02 42 views
14

Có ai vui lòng giải thích cho tôi sự khác biệt giữa bản đồ và bản đồSync w.r.t AKKA không? In the documentation người ta nói rằngSự khác biệt giữa bản đồ và bản đồAsync

biến đổi Stream và tác dụng phụ liên quan đến dịch vụ bên ngoài phi dòng dựa có thể được thực hiện với mapAsync hoặc mapAsyncUnordered

Tại sao không thể, chúng tôi chỉ đơn giản là chúng tôi lập bản đồ ở đây? Tôi giả định rằng Flow, Source, Sink tất cả sẽ là Monadic trong tự nhiên và do đó bản đồ sẽ làm việc tốt w.r.t sự chậm trễ trong bản chất của những?

Trả lời

31

Chữ ký

Sự khác biệt được đánh dấu tốt nhất trong signatures: Flow.map mất trong một hàm trả về một loại T khi Flow.mapAsync mất trong một hàm trả về một loại Future[T].

thực tiễn Ví dụ

Như một ví dụ, giả sử rằng chúng ta có một chức năng mà các truy vấn cơ sở dữ liệu cho tên đầy đủ của người dùng dựa trên một user id:

type UserID = String 
type FullName = String 

val databaseLookup : UserID => FullName = ??? //implementation unimportant 

Cho một Source của UserID giá trị chúng tôi chỉ có thể sử dụng Flow.map trong Luồng để truy vấn cơ sở dữ liệu và in tên đầy đủ vào bảng điều khiển:

val userIDSource : Source[UserID, _] = ??? 

val stream = 
    userIDSource.via(Flow[UserID].map(databaseLookup)) 
       .to(Sink.foreach[FullName](println)) 
       .run() 

Một hạn chế của việc triển khai này là luồng này sẽ chỉ thực hiện 1 truy vấn db tại một thời điểm. Đây sẽ là "nút cổ chai" và có khả năng ngăn thông lượng tối đa trong luồng của chúng tôi. Để cải thiện hiệu suất, chúng tôi chỉ đơn giản là có thể thêm đồng thời bằng cách gói databaseLookup bên trong một Future:

def concurrentDBLookup(userID : UserID) : Future[FullName] = 
    Future { databaseLookup(userID) } 

val concurrentStream = 
    userIDSource.via(Flow[UserID].map(concurrentDBLookup)) 
       .to(Sink.foreach[Future[FullName]](_ foreach println)) 
       .run() 

Vấn đề với phụ lục đồng thời đơn giản này là chúng ta đã loại bỏ một cách hiệu quả backpressure. Kể từ khi Sink chỉ là kéo trong tương lai và thêm một foreach println, mà là tương đối nhanh so với truy vấn cơ sở dữ liệu, dòng sẽ liên tục tuyên truyền nhu cầu đến nguồn và đẻ trứng ra tương lai nhiều hơn nữa. Điều này có nghĩa là không có giới hạn về số lượng databaseLookup chạy đồng thời, mà cuối cùng có thể làm thay đổi cơ sở dữ liệu.

Flow.mapAsync để giải cứu; chúng ta có thể tra cứu db đồng thời trong khi cùng một lúc đóng nắp số lượng tra cứu đồng thời:

val maxLookupCount = 10 

val maxLookupConcurrentStream = 
    userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup)) 
       .to(Sink.foreach[FullName](println)) 
       .run() 

Cũng lưu ý rằng Sink.foreach đã đơn giản hơn, nó không còn mất trong một Future[FullName] nhưng chỉ là một FullName để thay thế.

Unordered Async Bản đồ

Nếu bạn không quan tâm đến việc duy trì trật tự của userIds để FullNames bạn có thể sử dụng Flow.mapAsyncUnordered. Điều này sẽ hữu ích nếu tất cả những gì bạn quan tâm là in tất cả các tên đầy đủ nhưng không quan tâm đến thứ tự mà họ đến bàn điều khiển.

+0

Là 'mapAsync' tương tự như áp dụng ranh giới không đồng bộ cho giai đoạn cụ thể đó không? Theo tài liệu, đánh dấu ranh giới async sẽ chạy từng giai đoạn trong một diễn viên, chỉ cần tự hỏi nếu nó giống nhau. – jarvis11

Các vấn đề liên quan