2011-09-03 34 views
12

giả sử tôi có một chức năng ngủ:Chuyển đổi Scala @suspendable Phương pháp vào một tương lai

def sleep(delay:Int) : Unit @suspendable = { 
    .... 
} 

là nó có thể có một chức năng tương lai mà tạo ra một phiên bản async của hàm ngủ có thể được chờ đợi trên cách đồng bộ.

def future(targetFunc: (Int => Unit @suspendable)) : (Int => Future) = { 
    .... 
} 

class Future { 
    def await : Unit @suspendable = { 
    .... 
    } 
} 

bạn sẽ có thể làm điều gì đó như thế này:

reset { 
    val sleepAsync = future(sleep) 
    val future1 = sleepAsync(2000) 
    val future2 = sleepAsync(3000) 
    future1.await 
    future2.await 
    /* finishes after a delay of 3000 */ 
} 

hai cuộc gọi đến sleepAsync sẽ xuất hiện trở lại ngay lập tức và cả hai cuộc gọi đến trong tương lai # chờ đợi sẽ xuất hiện để ngăn chặn. tất nhiên họ đều thực sự rơi vào cuối của thiết lập lại và mã sau khi chịu trách nhiệm gọi sự tiếp tục sau khi sự chậm trễ.

nếu không có phương pháp thay thế nào để chạy hai hàm @suspendable song song và chờ cho cả hai hàm hoàn thành?

Tôi có một ý chính compilable với một bộ xương của những gì tôi muốn làm: https://gist.github.com/1191381

+0

tôi đã viết: https://gist.github.com/1191571 có vẻ như hoạt động nhưng có vẻ khá phức tạp. tôi cảm thấy như tôi có thể thiếu một cách đơn giản hơn để làm điều đó. – benmmurphy

+0

cũng tìm thấy điều này: http://days2011.scala-lang.org/node/138/288 mà dường như làm điều đó độc đáo hơn nhiều. – benmmurphy

+0

Bạn có ưu tiên cho câu trả lời "thắng" không? Tôi cần đưa ra giải thưởng tiền thưởng. –

Trả lời

1

Tôi không chắc rằng tôi hoàn toàn hiểu được những câu hỏi, nhưng đây là một thử:

import scala.util.continuations._ 

class Future(thread: Thread) { 
    def await = thread.join 
} 

object Future { 

    def sleep(delay: Long) = Thread.sleep(delay) 

    def future[A,B](f: A => B) = (a: A) => shift { k: (Future => Unit) => 
    val thread = new Thread { override def run() { f(a) } } 
    thread.start() 

    k(new Future(thread)) 
    } 

    def main(args:Array[String]) = reset { 
    val sleepAsync = future(sleep) 
    val future1 = sleepAsync(2000) // returns right away 
    val future2 = sleepAsync(3000) // returns right away 
    future1.await // returns after two seconds 
    future2.await // returns after an additional one second 
    // finished after a total delay of three seconds 
    } 
} 

Ở đây, một cá thể Future không gì khác hơn là một tay cầm trên Thread, vì vậy bạn có thể sử dụng phương thức join để chặn cho đến khi nó kết thúc.

Chức năng future mất một chức năng của loại A => B, và trả về một chức năng mà, khi được cung cấp với một A sẽ khởi động một thread để chạy "futured" chức năng, và quấn nó lên trong một Future, được tiêm trở lại vào phần tiếp tục, do đó gán nó cho val future1.

Đây có phải là nơi gần gũi với những gì bạn đang làm không?

+0

tôi muốn sử dụng liên tục thay vì các chủ đề – benmmurphy

+0

Bạn có thể chạy liên tục theo ý bạn, nhưng bằng cách nào đó chúng cần phải chạy khỏi luồng hiện tại (nếu không tổng thời gian chạy là 5000 ms thay vì 3000 ms). Trong thực tế, bạn có thể sử dụng một hồ bơi thread thay vì làm cho các trường hợp 'Thread' của riêng bạn. Bạn muốn chạy 'future1' và' future2' như thế nào? – earldouglas

+0

ah. tôi muốn tương lai để có thể có một chức năng đình chỉ không phải là một chức năng bình thường – benmmurphy

2
object Forks { 

    import scala.util.continuations._ 

    case class Forker(forks: Vector[() => Unit @suspendable]) { 
    def ~(block: => Unit @suspendable): Forker = Forker(forks :+ (() => block)) 
    def joinIf(pred: Int => Boolean): Unit @suspendable = shift { k: (Unit => Unit) => 
     val counter = new java.util.concurrent.atomic.AtomicInteger(forks.size) 
     forks foreach { f => 
     reset { 
      f() 
      if (pred(counter.decrementAndGet)) k() 
     } 
     } 
    } 
    def joinAll() = joinIf(_ == 0) 
    def joinAny() = joinIf(_ == forks.size - 1) 
    } 

    def fork(block: => Unit @suspendable): Forker = Forker(Vector(() => block)) 
} 

sử dụng fork(), giờ đây chúng tôi có thể đợi nhiều "tạm ngưng". sử dụng ~() để kết nối các chuỗi có thể tạm ngưng với nhau. sử dụng joinAll() để chờ tất cả các suspendables và joinAny() để đợi chỉ một. sử dụng joinIf() để tùy chỉnh chiến lược tham gia.

object Tests extends App { 

    import java.util.{Timer, TimerTask} 
    import scala.util.continuations._ 

    implicit val timer = new Timer 

    def sleep(ms: Int)(implicit timer: Timer): Unit @suspendable = { 
    shift { k: (Unit => Unit) => 
     timer.schedule(new TimerTask { 
     def run = k() 
     }, ms) 
    } 
    } 

    import Forks._ 

    reset { 
    fork { 
     println("sleeping for 2000 ms") 
     sleep(2000) 
     println("slept for 2000 ms") 
    } ~ { 
     println("sleeping for 4000 ms") 
     sleep(4000) 
     println("slept for 4000 ms") 
    } joinAll() 
    println("and we are done") 
    } 
    println("outside reset") 
    readLine 
    timer.cancel 
} 

và đây là đầu ra. chương trình bắt đầu vào thời gian T:

sleeping for 2000 ms 
sleeping for 4000 ms 
outside reset   <<<<<< T + 0 second 
slept for 2000 ms  <<<<<< T + 2 seconds 
slept for 4000 ms  <<<<<< T + 4 seconds 
and we are done  <<<<<< T + 4 seconds 
Các vấn đề liên quan