2016-09-28 15 views
5

Hãy nói rằng tôi có một số iterator:Làm thế nào để đối phó với nguồn phát ra Tương lai [T]?

val nextElemIter: Iterator[Future[Int]] = Iterator.continually(...) 


Và tôi muốn xây dựng một nguồn tin từ iterator rằng:

val source: Source[Future[Int], NotUsed] = 
    Source.fromIterator(() => nextElemIter) 


Bây giờ nguồn của tôi phát ra Future s. Tôi chưa bao giờ thấy tương lai được thông qua giữa các giai đoạn trong tài liệu Akka hay bất cứ nơi nào khác, vì vậy thay vào đó, tôi có thể luôn luôn làm một cái gì đó như thế này:

val source: Source[Int, NotUsed] = 
    Source.fromIterator(() => nextElemIter).mapAsync(1)(identity /* was n => n */) 


Và bây giờ tôi có một nguồn thường xuyên phát ra T thay vì Future[T]. Nhưng điều này cảm thấy bị hack và sai.

Cách thích hợp để giải quyết các tình huống như vậy là gì?

+3

Tôi nghĩ rằng 'mapAsync' là hoàn toàn tốt đẹp ở đây. Sau khi tất cả, nó được dự định chính xác cho mục đích này - làm phẳng tương lai vào suối. –

+1

'mapAsync (1) (nhận dạng)' là cách thích hợp để thực hiện nó. – expert

+0

@expert đã được chỉnh sửa. –

Trả lời

4

Trả lời câu hỏi của bạn trực tiếp: Tôi đồng ý với nhận xét của Vladimir rằng không có gì "hacky" về cách sử dụng mapAsync cho mục đích bạn mô tả. Tôi không thể nghĩ ra bất kỳ cách trực tiếp nào khác để tháo gỡ các số Future khỏi các giá trị Int bên dưới của bạn.

Trả lời câu hỏi của bạn gián tiếp ...

Cố gắng gắn bó với Futures

Streams, như một cơ chế đồng thời, là vô cùng hữu ích khi backpressure là bắt buộc. Tuy nhiên, hoạt động tinh khiết Future cũng có vị trí của chúng trong các ứng dụng.

Nếu Iterator[Future[Int]] của bạn sẽ tạo ra một số lượng giới hạn, được giới hạn, giá trị Future thì bạn có thể muốn sử dụng tương lai cho đồng thời.

Hãy tưởng tượng bạn muốn lọc, ánh xạ, & giảm giá trị Int.

def isGoodInt(i : Int) : Boolean = ???   //filter 
def transformInt(i : Int) : Int = ???   //map 
def combineInts(i : Int, j : Int) : Int = ??? //reduce 

Futures cung cấp một cách trực tiếp của việc sử dụng các chức năng:

val finalVal : Future[Int] = 
    Future sequence { 
    for { 
     f <- nextElemIter.toSeq //launch all of the Futures 
     i <- f if isGoodInt(i) 
    } yield transformInt(i) 
    } map (_ reduce combineInts) 

So với một cách hơi gián tiếp của việc sử dụng Stream như bạn đề nghị:

val finalVal : Future[Int] = 
    Source.fromIterator(() => nextElemIter) 
     .via(Flow[Future[Int]].mapAsync(1)(identity)) 
     .via(Flow[Int].filter(isGoodInt)) 
     .via(Flow[Int].map(transformInt)) 
     .to(Sink.reduce(combineInts)) 
     .run() 
Các vấn đề liên quan