2015-10-21 24 views
7

Tôi đang làm việc trên ứng dụng Spark Streaming dựa trên Java, ứng dụng này phản hồi các thông điệp đi qua chủ đề Kafka. Đối với mỗi tin nhắn, ứng dụng thực hiện một số xử lý và ghi lại kết quả cho một chủ đề Kafka khác.Xử lý ngoại lệ không bắt buộc trong Spark

Đôi khi do các vấn đề liên quan đến dữ liệu không mong muốn, mã hoạt động trên RDD có thể thất bại và ném một ngoại lệ. Khi điều đó xảy ra, tôi muốn có một trình xử lý chung có thể thực hiện hành động cần thiết và thả một thông báo đến một chủ đề lỗi. Ngay bây giờ, những ngoại lệ này được viết trong bản ghi của Spark bởi chính Spark.

Cách tiếp cận tốt nhất để làm điều này là gì, thay vì viết các khối try-catch cho mỗi khối mã hoạt động trên RDD?

+0

tôi thấy rằng ai đó đã đúc một cuộc bỏ phiếu gần nói câu hỏi này là dựa vào ý kiến. Tôi sẽ đánh giá cao nếu các chuyên gia ít nhất có thể làm sáng tỏ chút ít trước khi bỏ phiếu bầu nếu điều này là không thể với Spark kể từ bây giờ. Việc bỏ phiếu kín mà không có giải thích sẽ không giúp cộng đồng theo bất kỳ cách nào. –

+0

Bạn có thể viết một hàm chung để thực hiện điều này. Bạn chỉ cần bọc nó xung quanh các hành động RDD vì đó là những người duy nhất có thể ném các ngoại lệ Spark (các máy biến thế như .map và .filter là những hành động lười biếng do các hành động). (Giả sử đây là trong Scala) Bạn có thể thậm chí có thể thử một cái gì đó với implicits và một lỗi xử lý làm giàu lớp RDD bạn tạo ra để ngầm thực thi errorhandling của bạn chỉ với chữ ký kiểu. Tôi đã không thực hiện cuộc bỏ phiếu gần, nhưng tôi tưởng tượng cách tiếp cận "tốt nhất" có phần chủ quan với nhu cầu ứng dụng. – Rich

+0

Cảm ơn @Rich. Vì vậy, về cơ bản những gì bạn có nghĩa là để nói là không có cách nào được xây dựng trong Spark như của bây giờ để xử lý này, vì vậy mỗi ứng dụng nên chăm sóc nó. Nếu bạn có thể đăng bình luận của bạn như một câu trả lời, tôi sẽ chấp nhận nó. –

Trả lời

3

Bạn có thể viết hàm chung để thực hiện việc này. Bạn chỉ cần quấn nó xung quanh các hành động RDD vì đó là những người duy nhất có thể ném các ngoại lệ Spark (các máy biến áp như .map.filter được thực thi bởi các hành động).

(Giả sử đây là trong Scala) Bạn thậm chí có thể thử một cái gì đó với implicits. Tạo một lớp chứa RDD và xử lý lỗi. Dưới đây là một phác thảo về những gì có thể trông giống như:

implicit class FailSafeRDD[T](rdd: RDD[T]) { 
    def failsafeAction[U](fn: RDD[T] => U): Try[U] = Try { 
    fn(rdd) 
    } 
} 

Bạn có thể thêm lỗi tin nhắn chủ đề vào failsafeAction hoặc bất cứ điều gì bạn muốn làm mỗi khi vào thất bại. Và sau đó việc sử dụng có thể như sau:

val rdd = ??? // Some rdd you already have 
val resultOrException = rdd.failsafeAction { r => r.count() } 

Bên cạnh đó, tôi cho rằng cách tiếp cận "tốt nhất" là chủ quan với nhu cầu ứng dụng.

2

Tôi nghĩ bạn cũng có thể thực hiện điều này với một catch =>

dstream.foreachRDD { case rdd: RDD[String] => 
    rdd.foreach { case string: String => 
     try { 
     val kafkaProducer = ... 
     val msg = ... 
     kafkaProducer.send(msg) 
     } catch { 
     case d: DataException=> 
      val kafkaErrorProducer = ... 
      val errorMsg = ... 
      kafkaErrorProducer.send(errorMsg) 
     case t: Throwable => 
      //further error handling 
     } 
    } 
}