2013-07-16 39 views
5

Có một số gợi ý tốt về cách kết hợp tương lai with timeouts. Tuy nhiên tôi tò mò làm thế nào để làm điều này với sequenceOfFutures chuỗi FutureXử lý chuỗi thời gian chờ và tương lai Scala

tiếp cận đầu tiên của tôi trông như thế này

import scala.concurrent._ 
import scala.concurrent.duration._ 
import scala.concurrent.ExecutionContext.Implicits._ 

object FutureSequenceScala extends App { 
    println("Creating futureList") 

    val timeout = 2 seconds 
    val futures = List(1000, 1500, 1200, 800, 2000) map { ms => 
    val f = future { 
     Thread sleep ms 
     ms toString 
    } 
    Future firstCompletedOf Seq(f, fallback(timeout)) 
    } 

    println("Creating waitinglist") 
    val waitingList = Future sequence futures 
    println("Created") 

    val results = Await result (waitingList, timeout * futures.size) 
    println(results) 

    def fallback(timeout: Duration) = future { 
    Thread sleep (timeout toMillis) 
    "-1" 
    } 
} 

Có cách nào tốt hơn để xử lý timeout trong một chuỗi các kỳ hạn hoặc là một này giải pháp hợp lệ?

Trả lời

8

Có một vài điều trong mã của bạn ở đây mà bạn có thể muốn xem xét lại. Để bắt đầu, tôi không phải là một fan hâm mộ lớn của các nhiệm vụ gửi vào ExecutionContext có mục đích duy nhất là mô phỏng một thời gian chờ và cũng có Thread.sleep được sử dụng trong chúng. Cuộc gọi sleep đang chặn và bạn có thể muốn tránh việc thực hiện tác vụ trong ngữ cảnh thực thi hoàn toàn ngăn chặn để chờ một khoảng thời gian cố định. Tôi sẽ ăn cắp từ câu trả lời của tôi here và đề nghị rằng để xử lý thời gian chờ tinh khiết, bạn nên sử dụng một cái gì đó như tôi đã nêu trong câu trả lời đó. Các HashedWheelTimer là một thực hiện bộ đếm thời gian hiệu quả cao đó là mush tốt hơn phù hợp với xử lý thời gian chờ hơn là một nhiệm vụ mà chỉ ngủ.

Bây giờ, nếu bạn đi tuyến đường đó, thay đổi tiếp theo tôi sẽ đề xuất các mối quan tâm xử lý các lỗi liên quan đến thời gian chờ cá nhân cho từng tương lai. Nếu bạn muốn một cá nhân thất bại hoàn toàn thất bại tổng hợp Future trả lại từ cuộc gọi sequence, sau đó không làm gì thêm. Nếu bạn không muốn điều đó xảy ra, và thay vào đó muốn có một thời gian chờ để trở lại một số giá trị mặc định thay vào đó, sau đó bạn có thể sử dụng recover trên Future như thế này:

withTimeout(someFuture).recover{ 
    case ex:TimeoutException => someDefaultValue 
} 

Một khi bạn đã làm điều đó, bạn có thể mang lợi thế của các cuộc gọi lại không chặn và thực hiện như sau:

waitingList onComplete{ 
    case Success(results) => //handle success 
    case Failure(ex) => //handle fail 
} 

Mỗi tương lai có thời gian chờ và do đó sẽ không chỉ chạy vô hạn. Không cần IMO để chặn ở đó và cung cấp thêm một lớp xử lý thời gian chờ qua thông số atMost tới Await.result. Nhưng tôi đoán điều này giả định bạn là okay với cách tiếp cận không chặn. Nếu bạn thực sự cần phải chặn ở đó, sau đó bạn không nên chờ đợi timeout * futures.size lượng thời gian. Những tương lai này đang chạy song song; thời gian chờ có chỉ cần phải miễn là thời gian chờ cá nhân cho bản thân tương lai (hoặc chỉ dài hơn một chút để giải thích cho bất kỳ sự chậm trễ nào trong cpu/thời gian). Nó chắc chắn không nên là thời gian chờ * tổng số tương lai.

+0

Vì sự tò mò, cách 'HashedWheelTimer' hiệu quả hơn' TimerTask' hoặc 'newScheduledThreadPoolExecutor'? Cả hai đều làm cùng một công việc. – Jatin

+0

@Jatin, tôi cho rằng bạn có thể xem liên kết này để biết thêm thông tin: http://stackoverflow.com/questions/15347600/which-is-more-efficient-nettys-hashedwheeltimer-or-quartzs-scheduler. Nhưng ở trung tâm của nó, việc thêm nhiều nhiệm vụ hơn không nên tiêu tốn nhiều tài nguyên hơn. Nó được coi là một thời gian liên tục hơn (về nguồn tài nguyên hệ thống tiêu thụ) dựa trên bộ đếm thời gian sau đó một cái gì đó giống như một 'Timer' và' TimerTask'. Đối với một hệ thống thông lượng cao, nơi bạn sẽ lên kế hoạch rất nhiều và rất nhiều thời gian chờ ngắn dựa trên nhiệm vụ, đó là một giải pháp tốt hơn vì các tuyên bố sử dụng tài nguyên liên tục. – cmbaxter

+0

Nhưng cách 'STPE' với' coresize' '1' tiêu thụ nhiều tài nguyên hơn khi so sánh với' HashedWheelTimer' như thế nào? Tôi xin lỗi nhưng tôi không hiểu. 'STPE' có thời gian chèn thêm do heap nội bộ' O (log (n)) 'nhưng thời gian đánh dấu ít hơn. Bạn có thể vui lòng giải thích – Jatin

1

Đây là phiên bản hiển thị mức độ chặn của bạn fallback.

Lưu ý rằng người thực thi là một chuỗi duy nhất và bạn đang tạo nhiều hạn chế.

@cmbaxter đúng, thời gian chờ chính của bạn không được là timeout * futures.size, nó phải lớn hơn!

@cmbaxter cũng đúng mà bạn muốn nghĩ là không bị chặn. Một khi bạn làm điều đó, và bạn muốn áp đặt timeouts, sau đó bạn sẽ chọn một thành phần bộ đếm thời gian cho điều đó, xem câu trả lời được liên kết của mình (cũng được liên kết từ câu trả lời được liên kết của bạn).

Điều đó nói rằng, tôi vẫn thích my answer from your link, cho đến nay khi ngồi trong một vòng lặp chờ đợi điều tiếp theo nên thời gian chờ thực sự đơn giản.

Chỉ cần danh sách tương lai và thời gian chờ của chúng và giá trị dự phòng.

Có lẽ đó là một trường hợp sử dụng cho điều đó, chẳng hạn như một ứng dụng đơn giản rằng chỉ cần khối đối với một số kết quả (như thử nghiệm của bạn) và không thoát ra trước khi kết quả là trong

import scala.concurrent._ 
import scala.concurrent.duration._ 
import scala.concurrent.ExecutionContext 

import java.util.concurrent.Executors 
import java.lang.System.{ nanoTime => now } 

object Test extends App { 
    //implicit val xc = ExecutionContext.global 
    implicit val xc = ExecutionContext fromExecutorService (Executors.newSingleThreadExecutor) 

    def timed[A](body: =>A): A = { 
    val start = now 
    val res = body 
    val end = now 
    Console println (Duration fromNanos end-start).toMillis + " " + res 
    res 
    } 
    println("Creating futureList") 

    val timeout = 1500 millis 
    val futures = List(1000, 1500, 1200, 800, 2000) map { ms => 
    val f = future { 
     timed { 
     blocking(Thread sleep ms) 
     ms toString 
     } 
    } 
    Future firstCompletedOf Seq(f, fallback(timeout)) 
    } 

    println("Creating waitinglist") 
    val waitingList = Future sequence futures 
    println("Created") 

    timed { 
    val results = Await result (waitingList, 2 * timeout * futures.size) 
    println(results) 
    }  
    xc.shutdown 

    def fallback(timeout: Duration) = future { 
    timed { 
     blocking(Thread sleep (timeout toMillis)) 
     "-1" 
    } 
    } 
} 

gì đã xảy ra:.

Creating futureList 
Creating waitinglist 
Created 
1001 1000 
1500 -1 
1500 1500 
1500 -1 
1200 1200 
1500 -1 
800 800 
1500 -1 
2000 2000 
1500 -1 
List(1000, 1500, 1200, 800, 2000) 
14007() 
Các vấn đề liên quan