2013-04-15 30 views
12

Đây là lần theo dõi my previous question.Hủy tương lai và hứa hẹn tại Scala

Giả sử tôi có nhiệm vụ thực hiện cuộc gọi chặn gián đoạn. Tôi muốn chạy dưới dạng số Futurehủy bằng phương thức failurePromise.

Tôi muốn các hủy để làm việc như sau:

  • Nếu một hủy nhiệm vụ trước nó xong tôi muốn các nhiệm vụ để hoàn thành "ngay lập tức", làm gián đoạn cuộc gọi chặn nếu nó đã bắt đầu và tôi muốn Future để gọi onFailure.

  • Nếu một hủy nhiệm vụ sau nhiệm vụ kết thúc, tôi muốn có được một tình trạng nói rằng hủy thất bại kể từ khi nhiệm vụ đã hoàn thành.

Có hợp lý không? Có thể thực hiện trong Scala không? Có bất kỳ ví dụ nào về việc triển khai như vậy không?

Trả lời

12

scala.concurrent.Future là chỉ đọc, do đó, một người đọc không thể gây rối cho người đọc khác.

Nó có vẻ như bạn sẽ có thể thực hiện những gì bạn muốn như sau:

def cancellableFuture[T](fun: Future[T] => T)(implicit ex: ExecutionContext): (Future[T],() => Boolean) = { 
    val p = Promise[T]() 
    val f = p.future 
    p tryCompleteWith Future(fun(f)) 
    (f,() => p.tryFailure(new CancellationException)) 
} 

val (f, cancel) = cancellableFuture(future => { 
    while(!future.isCompleted) continueCalculation // isCompleted acts as our interrupted-flag 

    result // when we're done, return some result 
}) 

val wasCancelled = cancel() // cancels the Future (sets its result to be a CancellationException conditionally) 
+0

Cảm ơn. Giả sử tôi thực hiện một số cuộc gọi chặn _interruptible_ thay vì tính toán. Làm cách nào tôi có thể sửa đổi mã ở trên để làm gián đoạn chuỗi? – Michael

+2

Bạn sẽ phải thêm một var đồng bộ đặt chuỗi hiện tại dưới một khóa khi tính toán bắt đầu, và sau đó lấy khóa ở cuối và xóa var. Và hủy bỏ sẽ lấy khóa và gọi ngắt trên các chủ đề thiết lập, nếu có, hoặc bảo lãnh nếu null. –

+0

phải là 'while (! Future.isCompleted && moreWork) continueCalculation'? – sourcedelica

9

Đây là phiên bản gián đoạn mã của Victor cho mỗi nhận xét của anh ấy (Victor, hãy sửa tôi nếu tôi hiểu sai).

object CancellableFuture extends App { 

    def interruptableFuture[T](fun:() => T)(implicit ex: ExecutionContext): (Future[T],() => Boolean) = { 
    val p = Promise[T]() 
    val f = p.future 
    val aref = new AtomicReference[Thread](null) 
    p tryCompleteWith Future { 
     val thread = Thread.currentThread 
     aref.synchronized { aref.set(thread) } 
     try fun() finally { 
     val wasInterrupted = (aref.synchronized { aref getAndSet null }) ne thread 
     //Deal with interrupted flag of this thread in desired 
     } 
    } 

    (f,() => { 
     aref.synchronized { Option(aref getAndSet null) foreach { _.interrupt() } } 
     p.tryFailure(new CancellationException) 
    }) 
    } 

    val (f, cancel) = interruptableFuture[Int] {() => 
    val latch = new CountDownLatch(1) 

    latch.await(5, TimeUnit.SECONDS) // Blocks for 5 sec, is interruptable 
    println("latch timed out") 

    42 // Completed 
    } 

    f.onFailure { case ex => println(ex.getClass) } 
    f.onSuccess { case i => println(i) } 

    Thread.sleep(6000) // Set to less than 5000 to cancel 

    val wasCancelled = cancel() 

    println("wasCancelled: " + wasCancelled) 
} 

Với Thread.sleep(6000) đầu ra là:

latch timed out 
42 
wasCancelled: false 

Với Thread.sleep(1000) đầu ra là:

wasCancelled: true 
class java.util.concurrent.CancellationException 
+0

được đăng lên https://gist.github.com/ericacm/5401303 – sourcedelica

+0

Tôi có ý nghĩa như sau: https://gist.github.com/viktorklang/5409467 –

+0

Cảm ơn bạn. Đó chắc chắn là sạch hơn. – sourcedelica

Các vấn đề liên quan