2015-04-22 17 views
7

Tôi đang gọi một diễn viên sử dụng mẫu yêu cầu trong ứng dụng Phun và trả về kết quả là phản hồi HTTP. Tôi ánh xạ các lỗi từ diễn viên đến một mã lỗi tùy chỉnh.Giải quyết tương lai Akka từ yêu cầu trong trường hợp thất bại

val authActor = context.actorOf(Props[AuthenticationActor]) 

callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user => 
    complete(StatusCodes.OK, user) 
} 

def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = { 
onComplete(f) { 
    case Success(value: T) => cb(value) 
    case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage) 
    case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.") 
    //In reality this returns a custom error object. 
} 
} 

Điều này hoạt động chính xác khi authActor gửi một lỗi, nhưng nếu authActor ném ngoại lệ, không có gì xảy ra cho đến khi hết thời gian yêu cầu. Ví dụ:

override def receive: Receive = { 
    case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token") 
} 

Tôi biết rằng các tài liệu Akka nói rằng

Để hoàn thành tương lai với một ngoại lệ bạn cần gửi một thông điệp Không cho người gửi. Điều này không được thực hiện tự động khi một diễn viên ném một ngoại lệ trong khi xử lý tin nhắn.

Nhưng vì tôi sử dụng yêu cầu nhiều giao diện giữa các diễn viên định tuyến Phun và diễn viên dịch vụ, tôi không muốn quấn phần nhận của mỗi diễn viên trẻ bằng try/catch. Có cách nào tốt hơn để đạt được xử lý tự động các ngoại lệ trong các diễn viên trẻ và ngay lập tức giải quyết tương lai trong trường hợp ngoại lệ?

Chỉnh sửa: đây là giải pháp hiện tại của tôi. Tuy nhiên, nó khá lộn xộn để làm điều này cho mọi diễn viên trẻ.

override def receive: Receive = { 
case default => 
    try { 
    default match { 
     case _ => throw new ServiceException("")//Actual code would go here 
    } 
    } 
    catch { 
    case se: ServiceException => 
     logger.error("Service error raised:", se) 
     sender ! Failure(se) 
    case ex: Exception => 
     sender ! Failure(ex) 
     throw ex 
    } 
} 

Bằng cách đó nếu đó là lỗi dự kiến ​​(nghĩa là ServiceException), nó được xử lý bằng cách tạo lỗi. Nếu nó bất ngờ, nó trả về một thất bại ngay lập tức để tương lai được giải quyết, nhưng sau đó ném ngoại lệ để nó vẫn có thể được xử lý bởi SupervisorStrategy.

+0

Vâng ... gửi thông báo lỗi trước khi ném ngoại lệ. –

+0

Đó là những gì tôi không muốn làm - tôi nói ** Tôi không muốn quấn phần nhận của mỗi diễn viên trẻ với một thử/bắt **. Đây là một ví dụ đồ chơi, nó hoàn toàn có thể là tôi không kiểm soát nơi mà các ngoại lệ được ném. –

+0

Vâng ... bạn biết đấy ... một trong những con đường cơ bản của các hệ thống phân tán phục hồi là "làm cho các lỗi rõ ràng". Hãy nghĩ về các lỗi khác nhau có thể xảy ra ... làm cho chúng rõ ràng. Nếu bạn có thể có lỗi "TypeSafe" ... thậm chí tốt hơn. –

Trả lời

13

Nếu bạn muốn có một cách để cung cấp tự động gửi một phản ứng lại cho người gửi trong trường hợp ngoại lệ bất ngờ, sau đó một cái gì đó như thế này có thể làm việc cho bạn:

trait FailurePropatingActor extends Actor{ 
    override def preRestart(reason:Throwable, message:Option[Any]){ 
    super.preRestart(reason, message) 
    sender() ! Status.Failure(reason) 
    } 
} 

Chúng tôi ghi đè preRestart và tuyên truyền sự thất bại quay trở lại người gửi dưới dạng Status.Failure, điều này sẽ gây ra lỗi ngược dòng Future. Ngoài ra, điều quan trọng là gọi super.preRestart ở đây vì đó là nơi trẻ em dừng lại xảy ra. Sử dụng này trong một diễn viên trông giống như sau:

case class GetElement(list:List[Int], index:Int) 
class MySimpleActor extends FailurePropatingActor { 
    def receive = { 
    case GetElement(list, i) => 
     val result = list(i) 
     sender() ! result 
    } 
} 

Nếu tôi là để gọi một thể hiện của nam diễn viên này như sau:

import akka.pattern.ask 
import concurrent.duration._ 

val system = ActorSystem("test") 
import system.dispatcher 
implicit val timeout = Timeout(2 seconds) 
val ref = system.actorOf(Props[MySimpleActor]) 
val fut = ref ? GetElement(List(1,2,3), 6) 

fut onComplete{ 
    case util.Success(result) => 
    println(s"success: $result") 

    case util.Failure(ex) => 
    println(s"FAIL: ${ex.getMessage}") 
    ex.printStackTrace()  
}  

Sau đó, nó đúng cách sẽ đạt Failure khối của tôi. Bây giờ, mã trong đặc điểm cơ sở đó hoạt động tốt khi Futures không tham gia vào diễn viên đang mở rộng đặc điểm đó, giống như diễn viên đơn giản ở đây. Nhưng nếu bạn sử dụng Future giây thì bạn cần phải cẩn thận vì các ngoại lệ xảy ra trong Future không gây ra khởi động lại trong diễn viên và cũng có thể, trong preRestart, cuộc gọi tới sender() sẽ không trả về đúng ref vì nam diễn viên đã chuyển sang tin nhắn tiếp theo. Một diễn viên như thế này cho thấy vấn đề:

class MyBadFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 

    def receive = { 
    case GetElement(list, i) => 
     val orig = sender() 
     val fut = Future{ 
     val result = list(i) 
     orig ! result 
     }  
    } 
} 

Nếu chúng tôi sử dụng diễn viên này trong mã kiểm tra trước, chúng tôi sẽ luôn có thời gian chờ trong tình huống lỗi.Để giảm thiểu điều đó, bạn cần phải ống các kết quả của tương lai lại cho người gửi như vậy:

class MyGoodFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut pipeTo sender() 
    } 
} 

Trong trường hợp đặc biệt này, các diễn viên chính không được khởi động lại bởi vì nó đã không gặp phải một ngoại lệ còn tự do. Bây giờ, nếu diễn viên của bạn cần phải làm một số xử lý bổ sung sau khi tương lai, bạn có thể đường ống trở lại tự và rõ ràng thất bại khi bạn nhận được một Status.Failure:

class MyGoodFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut.to(self, sender()) 

    case d:Double => 
     sender() ! d * 2 

    case Status.Failure(ex) => 
     throw ex 
    } 
} 

Nếu hành vi đó trở nên phổ biến, bạn có thể làm cho nó có sẵn cho bất cứ điều gì diễn viên cần nó như vậy:

trait StatusFailureHandling{ me:Actor => 
    def failureHandling:Receive = { 
    case Status.Failure(ex) => 
     throw ex  
    } 
} 

class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = myReceive orElse failureHandling 

    def myReceive:Receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut.to(self, sender()) 

    case d:Double => 
     sender() ! d * 2   
    } 
} 
+1

Điều này có vẻ tốt, cảm ơn! –

Các vấn đề liên quan