Tôi có một tia lửa chảy môi trường với tia lửa 1.2.0 nơi tôi lấy dữ liệu từ một thư mục địa phương và mỗi khi tôi tìm thấy một tập tin mới được thêm vào thư mục Tôi thực hiện một số chuyển đổi.viết một RDD vào HDFS trong một bối cảnh spark-streaming
val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)
Để thực hiện phân tích của tôi trên dữ liệu DStream tôi phải biến nó thành một mảng
var arr = new ArrayBuffer[String]();
data.foreachRDD {
arr ++= _.collect()
}
Sau đó, tôi sử dụng dữ liệu thu được để trích xuất thông tin mà tôi muốn và lưu chúng trên HDFS.
val myRDD = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")
Kể từ khi tôi thực sự cần phải thao tác dữ liệu với một mảng nó không thể lưu dữ liệu trên HDFS với DStream.saveAsTextFiles("...")
(mà sẽ làm việc tốt) và tôi phải lưu RDD nhưng với preocedure này cuối cùng tôi đã có tập tin đầu ra có sản phẩm nào được đặt tên bán 00000 vv ...
với một arr.foreach(println)
tôi có thể xem kết quả chính xác của các transofmations.
nghi ngờ của tôi là tia lửa mà cố gắng ở mỗi đợt ghi dữ liệu trong cùng một file, xóa những gì đã được viết trước đó. Tôi đã cố gắng lưu trong một thư mục có tên động như myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString())
nhưng luôn luôn chỉ có một nếp gấp được tạo và các tệp đầu ra vẫn trống.
Làm thế nào tôi có thể viết một RDD vào HDFS trong một bối cảnh spark-streaming?
Tôi đoán vấn đề là arr bạn không có sẵn trên tất cả các công nhân. Bạn đã cố gắng để phát sóng arr của bạn và sau đó cuối cùng đã viết nó vào hdfs? –
vì tôi cần theo dõi một thư mục và chặn tất cả các tệp mới được tải lên và phát trực tuyến âm thanh như một giải pháp tốt. Nó không phải là một máy đơn lẻ mà là một cụm máy 2. Bây giờ tôi chỉ viết các tập tin dưới dạng văn bản nhưng trong tương lai tôi sẽ phải viết các tập tin lát gỗ và nó khá đơn giản với Spark – drstein
Bạn sẽ thử điều này? var arr = new ArrayBuffer [String](); val phát sóng = sc.broadcast (arr) data.foreachRDD { phát sóng ++ = _.collect() } val myRDD = sc.parallelize (phát sóng) myRDD.saveAsTextFile ("HDFS thư mục ....") –