2013-12-16 34 views
16

Tôi đang ở Spark, tôi có RDD từ tệp Avro. Bây giờ tôi muốn thực hiện một số phép biến đổi trên RDD đó và lưu nó lại dưới dạng tệp Avro:Spark: Ghi vào tập tin Avro

val job = new Job(new Configuration()) 
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema)) 

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2)) 
    .saveAsNewAPIHadoopFile(outputPath, 
    classOf[AvroKey[GenericRecord]], 
    classOf[org.apache.hadoop.io.NullWritable], 
    classOf[AvroKeyOutputFormat[GenericRecord]], 
    job.getConfiguration) 

Khi chạy Spark này, lược đồ này không thể tuần tự hóa được.

Nếu tôi bỏ ghi chú cuộc gọi .map (và chỉ có rdd.saveAsNewAPIHadoopFile), cuộc gọi thành công.

Tôi đang làm gì sai ở đây?

Bất kỳ ý tưởng nào?

+0

Bạn có thể vui lòng cung cấp dấu vết ngăn xếp ngoại lệ không? Các số phiên bản Spark, Hadoop và Avro cũng có thể hữu ích. – Wildfire

+0

Hãy tha thứ cho sự ngây thơ của tôi. Tôi có thể hỏi công việc đang làm gì ở đây không? Có vẻ như đó là một công việc giảm bản đồ? Nếu chúng ta sử dụng tia lửa để viết ra, tại sao chúng ta cần một công việc giảm bản đồ? –

Trả lời

2

Trình nối tiếp mặc định được Spark sử dụng là tuần tự hóa Java. Vì vậy, đối với tất cả các loại java nó sẽ cố gắng serialize bằng cách sử dụng Java serialization. AvroKey không phải là serializable, vì vậy bạn đang nhận được lỗi.

Bạn có thể sử dụng KryoSerializer hoặc plugin trong serialization tùy chỉnh của bạn (như Avro). Bạn có thể đọc thêm về tuần tự hóa tại đây. http://spark-project.org/docs/latest/tuning.html

Bạn cũng có thể bọc đối tượng của mình bằng thứ gì đó có thể được bên ngoài. Kiểm tra ví dụ SparkFlumeEvent kết thúc tốt đẹp AvroFlumeEvent tại đây: https://github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

5

Vấn đề ở đây liên quan đến tính không nối tiếp của lớp avro.Schema được sử dụng trong Công việc. Ngoại lệ được ném ra khi bạn cố gắng tham khảo đối tượng lược đồ từ mã bên trong hàm bản đồ.

Ví dụ, nếu bạn cố gắng làm như sau, bạn sẽ nhận được "Task không serializable" ngoại lệ:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
... 
rdd.map(t => { 
    // reference to the schema object declared outside 
    val record = new GenericData.Record(schema) 
}) 

Bạn có thể làm tất cả mọi thứ để làm việc bằng cách chỉ cần tạo một thể hiện mới của giản đồ bên trong hàm khối:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
// The schema above should not be used in closures, it's for other purposes 
... 
rdd.map(t => { 
    // create a new Schema object 
    val innserSchema = new Schema.Parser().parse(new File(jsonSchema)) 
    val record = new GenericData.Record(innserSchema) 
    ... 
}) 

vì bạn không muốn phân tích giản đồ Avro cho mỗi bản ghi bạn xử lý, một giải pháp tốt hơn sẽ phân tích các lược đồ ở mức phân vùng. Các công trình sau cũng hoạt động:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
// The schema above should not be used in closures, it's for other purposes 
... 
rdd.mapPartitions(tuples => { 
    // create a new Schema object 
    val innserSchema = new Schema.Parser().parse(new File(jsonSchema)) 

    tuples.map(t => { 
    val record = new GenericData.Record(innserSchema) 
    ... 
    // this closure will be bundled together with the outer one 
    // (no serialization issues) 
    }) 
}) 

Mã trên hoạt động miễn là bạn cung cấp tham chiếu di động cho tệp jsonSchema, vì chức năng bản đồ sẽ được thực thi bởi nhiều người thực thi từ xa. Nó có thể là một tham chiếu đến một tệp trong HDFS hoặc nó có thể được đóng gói cùng với ứng dụng trong JAR (bạn sẽ sử dụng các hàm nạp lớp để lấy nội dung của nó trong trường hợp sau).

Đối với những người đang cố gắng sử dụng Avro với Spark, nhận thấy rằng vẫn còn một số vấn đề chưa được giải quyết biên soạn và bạn phải sử dụng việc nhập khẩu sau trên Maven POM:

<dependency> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro-mapred</artifactId> 
    <version>1.7.7</version> 
    <classifier>hadoop2</classifier> 
<dependency> 

Lưu ý "hadoop2" phân loại. Bạn có thể theo dõi sự cố tại https://issues.apache.org/jira/browse/SPARK-3039.

+0

Phương pháp này hoạt động tốt khi không có phụ thuộc bên ngoài bên trong chức năng bản đồ của chúng tôi. Có cách nào để làm cho schema serializable? – COSTA

Các vấn đề liên quan