Chữ ký
Sự khác biệt được đánh dấu tốt nhất trong signatures: Flow.map
mất trong một hàm trả về một loại T
khi Flow.mapAsync
mất trong một hàm trả về một loại Future[T]
.
thực tiễn Ví dụ
Như một ví dụ, giả sử rằng chúng ta có một chức năng mà các truy vấn cơ sở dữ liệu cho tên đầy đủ của người dùng dựa trên một user id:
type UserID = String
type FullName = String
val databaseLookup : UserID => FullName = ??? //implementation unimportant
Cho một Source
của UserID
giá trị chúng tôi chỉ có thể sử dụng Flow.map
trong Luồng để truy vấn cơ sở dữ liệu và in tên đầy đủ vào bảng điều khiển:
val userIDSource : Source[UserID, _] = ???
val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()
Một hạn chế của việc triển khai này là luồng này sẽ chỉ thực hiện 1 truy vấn db tại một thời điểm. Đây sẽ là "nút cổ chai" và có khả năng ngăn thông lượng tối đa trong luồng của chúng tôi. Để cải thiện hiệu suất, chúng tôi chỉ đơn giản là có thể thêm đồng thời bằng cách gói databaseLookup
bên trong một Future
:
def concurrentDBLookup(userID : UserID) : Future[FullName] =
Future { databaseLookup(userID) }
val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()
Vấn đề với phụ lục đồng thời đơn giản này là chúng ta đã loại bỏ một cách hiệu quả backpressure. Kể từ khi Sink chỉ là kéo trong tương lai và thêm một foreach println
, mà là tương đối nhanh so với truy vấn cơ sở dữ liệu, dòng sẽ liên tục tuyên truyền nhu cầu đến nguồn và đẻ trứng ra tương lai nhiều hơn nữa. Điều này có nghĩa là không có giới hạn về số lượng databaseLookup
chạy đồng thời, mà cuối cùng có thể làm thay đổi cơ sở dữ liệu.
Flow.mapAsync
để giải cứu; chúng ta có thể tra cứu db đồng thời trong khi cùng một lúc đóng nắp số lượng tra cứu đồng thời:
val maxLookupCount = 10
val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()
Cũng lưu ý rằng Sink.foreach
đã đơn giản hơn, nó không còn mất trong một Future[FullName]
nhưng chỉ là một FullName
để thay thế.
Unordered Async Bản đồ
Nếu bạn không quan tâm đến việc duy trì trật tự của userIds để FullNames bạn có thể sử dụng Flow.mapAsyncUnordered
. Điều này sẽ hữu ích nếu tất cả những gì bạn quan tâm là in tất cả các tên đầy đủ nhưng không quan tâm đến thứ tự mà họ đến bàn điều khiển.
Là 'mapAsync' tương tự như áp dụng ranh giới không đồng bộ cho giai đoạn cụ thể đó không? Theo tài liệu, đánh dấu ranh giới async sẽ chạy từng giai đoạn trong một diễn viên, chỉ cần tự hỏi nếu nó giống nhau. – jarvis11