2017-05-18 16 views
5

Tôi có một ứng dụng Spark trong Scala lấy bản ghi từ Kafka cứ 10 giây một lần và lưu chúng dưới dạng tệp. Đây là dự án SBT và tôi chạy ứng dụng của mình với lệnh sbt run. Mọi thứ hoạt động tốt cho đến khi tôi triển khai ứng dụng của mình trên Tomcat. Tôi quản lý để tạo tệp WAR với this plugin nhưng có vẻ như ứng dụng của tôi không làm bất cứ điều gì khi được triển khai trên Tomcat.
Đây là mã của tôi:Tại sao ứng dụng Spark Streaming hoạt động tốt bằng cách chạy sbt nhưng không hoạt động trên Tomcat (dưới dạng ứng dụng web)?

object SparkConsumer { 
    def main (args: Array[String]) { 

    val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver") 
    val ssc = new StreamingContext(conf, Seconds(10)) 


    val kafkaParams = Map[String, Object](
     "bootstrap.servers" -> "localhost:9092", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> "group_id", 
     "auto.offset.reset" -> "latest", 
     "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 
    val topics = Array("mytopic") 
    val stream = KafkaUtils.createDirectStream[String, String](
     ssc, 
     PreferConsistent, 
     Subscribe[String, String](topics, kafkaParams) 
    ) 

    stream.map(record => (record.key, record.value)).print 

    val arr = new ArrayBuffer[String](); 

    val lines = stream.map(record => (record.key, record.value)); 

    stream.foreachRDD { rdd => 

     if (rdd.count() > 0) { 
      val date = System.currentTimeMillis() 
      rdd.saveAsTextFile ("/tmp/sparkout/mytopic/" + date.toString) 
      rdd.foreach { record => println("t=" + record.topic + " m=" + record.toString()) } 
     } 

     println("Stream had " + rdd.count() + " messages") 

     val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
     rdd.foreachPartition { iter => 
     val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) 
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") 
     println(o) 
     } 
    } 

    stream.saveAsTextFiles("/tmp/output") 


    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

Điều kỳ lạ là các ứng dụng hoạt động hoàn toàn tốt khi chạy qua sbt run lệnh. Nó đọc các bản ghi từ Kafka đúng cách và lưu chúng dưới dạng các tệp trong thư mục mong muốn. Tôi không biết chuyện gì đang xảy ra. Tôi đã cố gắng để kích hoạt đăng nhập với log4j nhưng nó thậm chí không đăng nhập bất cứ điều gì khi trên Tomcat. Tôi đã tìm kiếm câu trả lời nhưng chưa tìm ra giải pháp.

Tóm lại

My ứng dụng Scala Spark (đó là dự án SBT) nên đọc hồ sơ từ Kafka và lưu chúng như tập tin mỗi 10 giây. Nó hoạt động khi chạy qua lệnh sbt run nhưng nó không khi triển khai trên Tomcat.

Thông tin thêm:

  • Scala 2.12
  • Tomcat 7
  • SBT 0.13.15
  • hỏi thêm

Q: là gì vấn đề?

+0

Tôi sẽ suy đoán rằng có thể có các thư viện bị thiếu trong đường dẫn lớp. Bạn đã buộc gỡ lỗi ứng dụng từ xa chưa? – Serhiy

Trả lời

1

tl; dr Ứng dụng độc lập SparkConsumer hoạt động đúng trên Tomcat và bản thân Tomcat cũng vậy.

Tôi rất ngạc nhiên khi đã đọc câu hỏi vì mã của bạn không phải là thứ tôi mong đợi làm việc bao giờ trên Tomcat. Lấy làm tiếc.

Tomcat là một thùng chứa servlet và như vậy yêu cầu các servlet trong một ứng dụng web.

Mặc dù bạn đã tạo WAR và triển khai nó cho Tomcat, bạn không "kích hoạt" bất kỳ thứ gì từ ứng dụng web này để khởi động ứng dụng Spark Streaming (mã bên trong phương thức main).

Ứng dụng Spark Streaming hoạt động tốt khi được thực hiện bằng cách sử dụng sbt run vì đó là mục tiêu của sbt run, tức là thực thi ứng dụng độc lập trong một dự án do sbt quản lý.

Với bạn chỉ có một ứng dụng độc lập trong dự án sbt của mình, sbt run đã quản lý để tìm SparkConsumer và thực hiện phương thức nhập main của nó. Không có gì ngạc nhiên ở đây.

Tuy nhiên, nó sẽ không hoạt động trên Tomcat. Bạn sẽ phải trưng ra ứng dụng dưới dạng POST hoặc GET endpoint và sử dụng một HTTP client (một trình duyệt hoặc công cụ dòng lệnh như curl, wget hoặc httpie) để thực thi nó.

Spark không hỗ trợ Scala 2.12 vì vậy ... làm cách nào bạn quản lý để sử dụng phiên bản Scala với Spark ?! Không thể!

Các vấn đề liên quan