2017-02-21 43 views
9

Tôi muốn có một diễn viên tiêu dùng đăng ký chủ đề Kafka và dữ liệu luồng để xử lý thêm với Spark Streaming bên ngoài người tiêu dùng. Tại sao một diễn viên? Bởi vì tôi đọc rằng chiến lược giám sát của nó sẽ là một cách tuyệt vời để xử lý thất bại Kafka (ví dụ, khởi động lại thất bại).Phát trực tiếp từ một diễn viên

tôi thấy hai lựa chọn:

  • Java KafkaConsumer lớp: Phương pháp poll() nó trả về một Map[String, Object]. Tôi muốn một số DStream được trả lại giống như KafkaUtils.createDirectStream và tôi không biết cách tìm nạp luồng từ bên ngoài diễn viên.
  • Kéo dài đặc điểm ActorHelper và sử dụng actorStream() như được hiển thị trong số example này. Tùy chọn thứ hai này không hiển thị kết nối với một chủ đề mà là một ổ cắm.

Mọi người có thể chỉ cho tôi đúng hướng không?

Trả lời

2

Đối xử lý thất bại Kafka, tôi đã sử dụng khuôn khổ Apache Phụ trách và thực hiện giải pháp sau đây:

val client: CuratorFramework = ... // see docs 
val zk: CuratorZookeeperClient = client.getZookeeperClient 

/** 
    * This method returns false if kafka or zookeeper is down. 
    */ 
def isKafkaAvailable:Boolean = 
    Try { 
     if (zk.isConnected) { 
     val xs = client.getChildren.forPath("/brokers/ids") 
     xs.size() > 0 
     } 
     else false 
    }.getOrElse(false) 

Đối với tiêu thụ chủ đề Kafka, tôi đã sử dụng thư viện com.softwaremill.reactivekafka. Ví dụ:

class KafkaConsumerActor extends Actor { 
    val kafka = new ReactiveKafka() 
    val config: ConsumerProperties[Array[Byte], Any] = ... // see docs 

    override def preStart(): Unit = { 
     super.preStart() 

     val publisher = kafka.consume(config) 
     Source.fromPublisher(publisher) 
      .map(handleKafkaRecord) 
      .to(Sink.ignore).run() 
    } 

    /** 
    * This method will be invoked when any kafka records will happen. 
    */ 
    def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = { 
     // handle record 
    } 
} 
Các vấn đề liên quan