2013-08-04 29 views
12

Đôi khi tôi thấy mình trong tình huống có số Stream[X]function X => Future Y, tôi muốn kết hợp với một số Future[Stream[Y]] và dường như tôi không tìm được cách nào để làm điều đó. Ví dụ, tôi cólập bản đồ Luồng có chức năng trả về Tương lai

val x = (1 until 10).toStream 
def toFutureString(value : Integer) = Future(value toString) 

val result : Future[Stream[String]] = ??? 

tôi đã cố gắng

val result = Future.Traverse(x, toFutureString) 

mang đến cho kết quả chính xác, nhưng dường như tiêu thụ toàn bộ dòng trước khi trở về tương lai, trong đó nhiều hay thất bại ít purpse

Tôi đã thử

val result = x.flatMap(toFutureString) 

nhưng không biên dịch với type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]

val result = x.map(toFutureString) 

trả về hơi kỳ quặc và vô dụng Stream[Future[String]]

Tôi nên làm gì đây để có được những thứ cố định?

Chỉnh sửa: Tôi không bị mắc kẹt trên một Stream, tôi muốn được bình đẳng hạnh phúc với những hoạt động tương tự trên một Iterator, miễn là nó sẽ không chặn trên đánh giá tất cả các mặt hàng trước khi bắt đầu để xử lý người đứng đầu

Chỉnh sửa2: Tôi không chắc chắn 100% rằng Future.Traverse xây dựng cần phải đi qua toàn bộ dòng trước khi trở về một [Stream] tương lai, nhưng tôi nghĩ rằng nó. Nếu không, đó là một câu trả lời tốt trong chính nó.

Chỉnh sửa3: Tôi cũng không cần kết quả theo thứ tự, tôi ổn với luồng hoặc trình lặp được trả lại là thứ tự bất kỳ.

+0

Lưu ý rằng tôi đã gửi [một vấn đề] (https://issues.scala-lang.org/browse/SI-7718) để theo dõi câu trả lời của tôi bên dưới. –

+0

ah, tuyệt vời @TravisBrown. Tôi muốn làm điều đó bản thân mình, nhưng tôi không thể tìm thấy một cách để đăng nhập vào Jira – Martijn

+0

Một chút không rõ ràng - bạn muốn tránh áp dụng "toFutureString" cho tất cả các yếu tố trong bộ sưu tập trước ...? Dường như không có nhiều chi phí để tạo ra một tương lai. Nếu các mục còn lại trong "danh sách" là các khối, điều gì sẽ kích hoạt đánh giá của họ? Hoàn thành tương lai trước đó trong danh sách? Tất cả các hoạt động tuần tự/truyền tải mà tôi có thể tìm thấy trong Scala dường như nghiêm ngặt đối với các yếu tố danh sách riêng lẻ. – pdxleif

Trả lời

9

Bạn đang đi đúng hướng với traverse, nhưng tiếc là có vẻ như định nghĩa của thư viện chuẩn bị hỏng một chút trong trường hợp này — không cần phải tiêu thụ luồng trước khi quay lại.

Future.traverse là một phiên bản cụ thể của một hàm tổng quát hơn nhiều hoạt động trên bất kỳ functor applicative bọc trong một loại "traversable" (xem thesepapers hoặc câu trả lời của tôi here để biết thêm thông tin, ví dụ).

Thư viện Scalaz cung cấp phiên bản tổng quát hơn này, và nó hoạt động như mong đợi trong trường hợp này (lưu ý rằng tôi nhận được dụ functor applicative cho Future từ scalaz-contrib; đó là chưa có trong các phiên bản ổn định của Scalaz, mà vẫn còn cross-xây dựng chống Scala 2.9.2, mà không có điều này Future):

import scala.concurrent._ 
import scalaz._, Scalaz._, scalaz.contrib.std._ 

import ExecutionContext.Implicits.global 

def toFutureString(value: Int) = Future(value.toString) 

val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString 

này trả về ngay lập tức trên một dòng vô hạn, vì vậy chúng tôi biết chắc chắn rằng nó không phải là tiêu thụ đầu tiên.


Như một chú thích: Nếu bạn nhìn vào the source cho Future.traverse bạn sẽ thấy rằng nó thực hiện trong điều kiện của foldLeft, đó là thuận tiện, nhưng không cần thiết hoặc thích hợp trong trường hợp của con suối.

+0

FYI, Scala 2.9.3 chứa scala.concurrent –

+0

@ViktorKlang: Đúng vậy, và những trường hợp này [đang đến với Scalaz core] (https://github.com/scalaz/scalaz/wiki/Roadmap#sip-14) sớm, nhưng theo như tôi biết không có một thời gian cụ thể nào được nêu ra. –

+0

Mát mẻ, nghe có vẻ như một ý tưởng tốt –

2

Quên về Suối:

import scala.concurrent.Future 
import ExecutionContext.Implicits.global 

val x = 1 to 10 toList 
def toFutureString(value : Int) = Future { 
    println("starting " + value) 
    Thread.sleep(1000) 
    println("completed " + value) 
    value.toString 
} 

năng suất (trên hộp 8 lõi của tôi):

scala> Future.traverse(x)(toFutureString) 
starting 1 
starting 2 
starting 3 
starting 4 
starting 5 
starting 6 
starting 7 
starting 8 
res12: scala.concurrent.Future[List[String]] = [email protected] 

scala> completed 1 
completed 2 
starting 9 
starting 10 
completed 3 
completed 4 
completed 5 
completed 6 
completed 7 
completed 8 
completed 9 
completed 10 

Vì vậy, 8 trong số họ bị đá ra ngay lập tức (một cho mỗi lõi, mặc dù đó là cấu hình thông qua threadpool executor), và sau đó là những người hoàn chỉnh hơn được khởi động. Tương lai [List [String]] trả về ngay lập tức, và sau đó sau khi tạm dừng, nó bắt đầu in những thông báo "x hoàn thành" đó.

Việc sử dụng ví dụ này có thể là khi bạn có Danh sách [Url's] và chức năng của loại Url => Future [HttpResponseBody]. Bạn có thể gọi Future.traverse trên danh sách đó với hàm đó và khởi động các yêu cầu http đó song song, lấy lại một tương lai đơn lẻ đó là Danh sách kết quả.

Có phải thứ giống như những gì bạn đang làm không?

+0

Tôi đoán "không muốn các yếu tố dòng được đánh giá háo hức" có vẻ là mâu thuẫn với chủ nghĩa song song, khi những yếu tố đó là đầu vào cho nhiệm vụ bắt đầu tương lai. Bạn muốn Stream được tiêu thụ/đánh giá như thế nào? – pdxleif

Các vấn đề liên quan