Tôi có một ứng dụng Spark trong Scala lấy bản ghi từ Kafka cứ 10 giây một lần và lưu chúng dưới dạng tệp. Đây là dự án SBT và tôi chạy ứng dụng của mình với lệnh sbt run
. Mọi thứ hoạt động tốt cho đến khi tôi triển khai ứng dụng của mình trên Tomcat. Tôi quản lý để tạo tệp WAR với this plugin nhưng có vẻ như ứng dụng của tôi không làm bất cứ điều gì khi được triển khai trên Tomcat.
Đây là mã của tôi:Tại sao ứng dụng Spark Streaming hoạt động tốt bằng cách chạy sbt nhưng không hoạt động trên Tomcat (dưới dạng ứng dụng web)?
object SparkConsumer {
def main (args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("mytopic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value)).print
val arr = new ArrayBuffer[String]();
val lines = stream.map(record => (record.key, record.value));
stream.foreachRDD { rdd =>
if (rdd.count() > 0) {
val date = System.currentTimeMillis()
rdd.saveAsTextFile ("/tmp/sparkout/mytopic/" + date.toString)
rdd.foreach { record => println("t=" + record.topic + " m=" + record.toString()) }
}
println("Stream had " + rdd.count() + " messages")
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
println(o)
}
}
stream.saveAsTextFiles("/tmp/output")
ssc.start()
ssc.awaitTermination()
}
}
Điều kỳ lạ là các ứng dụng hoạt động hoàn toàn tốt khi chạy qua sbt run
lệnh. Nó đọc các bản ghi từ Kafka đúng cách và lưu chúng dưới dạng các tệp trong thư mục mong muốn. Tôi không biết chuyện gì đang xảy ra. Tôi đã cố gắng để kích hoạt đăng nhập với log4j
nhưng nó thậm chí không đăng nhập bất cứ điều gì khi trên Tomcat. Tôi đã tìm kiếm câu trả lời nhưng chưa tìm ra giải pháp.
Tóm lại
My ứng dụng Scala Spark (đó là dự án SBT) nên đọc hồ sơ từ Kafka và lưu chúng như tập tin mỗi 10 giây. Nó hoạt động khi chạy qua lệnh sbt run
nhưng nó không khi triển khai trên Tomcat.
Thông tin thêm:
- Scala 2.12
- Tomcat 7
- SBT 0.13.15
- hỏi thêm
Q: là gì vấn đề?
Tôi sẽ suy đoán rằng có thể có các thư viện bị thiếu trong đường dẫn lớp. Bạn đã buộc gỡ lỗi ứng dụng từ xa chưa? – Serhiy