2015-07-02 15 views
7

Tôi có một tia lửa chảy môi trường với tia lửa 1.2.0 nơi tôi lấy dữ liệu từ một thư mục địa phương và mỗi khi tôi tìm thấy một tập tin mới được thêm vào thư mục Tôi thực hiện một số chuyển đổi.viết một RDD vào HDFS trong một bối cảnh spark-streaming

val ssc = new StreamingContext(sc, Seconds(10)) 
val data = ssc.textFileStream(directory) 

Để thực hiện phân tích của tôi trên dữ liệu DStream tôi phải biến nó thành một mảng

var arr = new ArrayBuffer[String](); 
    data.foreachRDD { 
    arr ++= _.collect() 
} 

Sau đó, tôi sử dụng dữ liệu thu được để trích xuất thông tin mà tôi muốn và lưu chúng trên HDFS.

val myRDD = sc.parallelize(arr) 
myRDD.saveAsTextFile("hdfs directory....") 

Kể từ khi tôi thực sự cần phải thao tác dữ liệu với một mảng nó không thể lưu dữ liệu trên HDFS với DStream.saveAsTextFiles("...") (mà sẽ làm việc tốt) và tôi phải lưu RDD nhưng với preocedure này cuối cùng tôi đã có tập tin đầu ra có sản phẩm nào được đặt tên bán 00000 vv ...

với một arr.foreach(println) tôi có thể xem kết quả chính xác của các transofmations.

nghi ngờ của tôi là tia lửa mà cố gắng ở mỗi đợt ghi dữ liệu trong cùng một file, xóa những gì đã được viết trước đó. Tôi đã cố gắng lưu trong một thư mục có tên động như myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString()) nhưng luôn luôn chỉ có một nếp gấp được tạo và các tệp đầu ra vẫn trống.

Làm thế nào tôi có thể viết một RDD vào HDFS trong một bối cảnh spark-streaming?

+0

Tôi đoán vấn đề là arr bạn không có sẵn trên tất cả các công nhân. Bạn đã cố gắng để phát sóng arr của bạn và sau đó cuối cùng đã viết nó vào hdfs? –

+0

vì tôi cần theo dõi một thư mục và chặn tất cả các tệp mới được tải lên và phát trực tuyến âm thanh như một giải pháp tốt. Nó không phải là một máy đơn lẻ mà là một cụm máy 2. Bây giờ tôi chỉ viết các tập tin dưới dạng văn bản nhưng trong tương lai tôi sẽ phải viết các tập tin lát gỗ và nó khá đơn giản với Spark – drstein

+0

Bạn sẽ thử điều này? var arr = new ArrayBuffer [String](); val phát sóng = sc.broadcast (arr) data.foreachRDD { phát sóng ++ = _.collect() } val myRDD = sc.parallelize (phát sóng) myRDD.saveAsTextFile ("HDFS thư mục ....") –

Trả lời

5

Bạn đang sử dụng Spark streaming trong một cách thức mà nó không được thiết kế. Tôi khuyên bạn nên sử dụng Spark để sử dụng cho trường hợp sử dụng của bạn hoặc điều chỉnh mã của bạn để nó hoạt động theo cách Spark. Thu thập mảng cho người lái xe đánh bại mục đích sử dụng một công cụ phân tán và làm cho ứng dụng của bạn có hiệu quả một máy (hai máy cũng sẽ gây ra nhiều chi phí hơn là chỉ xử lý dữ liệu trên một máy).

Tất cả những gì bạn có thể làm với một mảng, bạn có thể làm với Spark. Vì vậy, chỉ cần chạy tính toán của bạn bên trong dòng, phân phối trên các công nhân, và viết đầu ra của bạn bằng cách sử dụng DStream.saveAsTextFiles(). Bạn có thể sử dụng foreachRDD + saveAsParquet(path, overwrite = true) để ghi vào một tệp.

+0

Cảm ơn, tôi hoàn toàn nhận được quan điểm của bạn, tôi sẽ cố gắng thay đổi logic trasform để sử dụng DStream. Bạn có biết nếu có thể phát trực tiếp tia lửa trong mỗi lô để lưu bản ghi trong cùng một tệp không? Ngay bây giờ tôi nhận được một thư mục mới với các tập tin mới mỗi khoảng thời gian hàng loạt. – drstein

+1

Có, với foreachRDD + saveAsParquet có một tùy chọn để ghi đè lên. –

+0

@MariusSoutier, bạn có thể vui lòng giúp tôi với 'http: // stackoverflow.com/questions/39363586/issue-while-storage-data-from-spark-streaming-to-cassanadra' này không – Naresh

2

@vzamboni: Spark 1.5 + dataframes api có tính năng này:

dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).partitionBy("parameter1", "parameter2").save(path); 
Các vấn đề liên quan