2015-09-17 22 views
6

Tôi muốn lưu (dưới dạng tệp lát gỗ) một Khung dữ liệu Spark chứa một lớp tùy chỉnh làm cột. Lớp này được tạo bởi một Seq của một lớp tùy chỉnh khác. Để làm như vậy, tôi tạo một lớp UserDefinedType cho từng lớp này, theo cách tương tự với VectorUDT. Tôi có thể làm việc với các dataframe như tôi dự định nhưng không thể lưu nó vào đĩa như một sàn gỗ (hoặc jason) Tôi báo cáo nó như là một lỗi, nhưng có thể có một vấn đề với mã của tôi. Tôi đã thực hiện một ví dụ đơn giản để hiển thị các vấn đề:Lưu các khung dữ liệu Spark với các kiểu dữ liệu người dùng lồng nhau

import org.apache.spark.sql.SaveMode 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.catalyst.InternalRow 
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow 
import org.apache.spark.sql.types._ 

@SQLUserDefinedType(udt = classOf[AUDT]) 
case class A(list:Seq[B]) 

class AUDT extends UserDefinedType[A] { 
    override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true))) 
    override def userClass: Class[A] = classOf[A] 
    override def serialize(obj: Any): Any = obj match { 
    case A(list) => 
     val row = new GenericMutableRow(1) 
     row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) 
     row 
    } 

    override def deserialize(datum: Any): A = { 
    datum match { 
     case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) 
    } 
    } 
} 

object AUDT extends AUDT 

@SQLUserDefinedType(udt = classOf[BUDT]) 
case class B(num:Int) 

class BUDT extends UserDefinedType[B] { 
    override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false))) 
    override def userClass: Class[B] = classOf[B] 
    override def serialize(obj: Any): Any = obj match { 
    case B(num) => 
     val row = new GenericMutableRow(1) 
     row.setInt(0, num) 
     row 
    } 

    override def deserialize(datum: Any): B = { 
    datum match { 
     case row: InternalRow => new B(row.getInt(0)) 
    } 
    } 
} 

object BUDT extends BUDT 

object TestNested { 
    def main(args:Array[String]) = { 
    val col = Seq(new A(Seq(new B(1), new B(2))), 
        new A(Seq(new B(3), new B(4)))) 

    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark")) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    val df = sc.parallelize(1 to 2 zip col).toDF() 
    df.show() 

    df.write.mode(SaveMode.Overwrite).save(...) 
    } 
} 

Điều này dẫn đến các lỗi sau:

15/09/16 16:44:39 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: Nested type should be repeated: required group array { required int32 num; } at org.apache.parquet.schema.ConversionPatterns.listWrapper(ConversionPatterns.java:42) at org.apache.parquet.schema.ConversionPatterns.listType(ConversionPatterns.java:97) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:460) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:522) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:521) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:521) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:526) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:92) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.sql.types.StructType.map(StructType.scala:92) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convert(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypesConverter.scala:58) at org.apache.spark.sql.execution.datasources.parquet.RowWriteSupport.init(ParquetTableSupport.scala:55) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:272) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:234) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 16:44:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost):

Nếu một tiết kiệm dataframe với B thay vì của A không có vấn đề tồn tại từ B như không lớp tùy chỉnh lồng nhau. Tui bỏ lỡ điều gì vậy?

Trả lời

2

Tôi phải thực hiện bốn thay đổi đối với mã của bạn để làm cho nó hoạt động (được thử nghiệm trong Spark 1.6.0 trên Linux) và tôi nghĩ rằng tôi có thể chủ yếu là giải thích lý do tại sao chúng cần. Tôi thấy mình tự hỏi, tuy nhiên, cho dù có một giải pháp đơn giản hơn. Tất cả những thay đổi trong AUDT, như sau:

  1. Khi xác định sqlType, làm cho nó phụ thuộc vào BUDT.sqlType, thay vì chỉ BUDT.
  2. Trong serialize(), hãy gọi BUDT.serialize() trên từng phần tử danh sách.
  3. Trong deserialize():
    • gọi toArray(BUDT.sqlType) thay vì toArray(BUDT)
    • gọi BUDT.deserialize() trên mọi phần tử

Dưới đây là mã kết quả:

class AUDT extends UserDefinedType[A] { 
    override def sqlType: DataType = 
    StructType(
     Seq(StructField("list", 
         ArrayType(BUDT.sqlType, containsNull = false), 
         nullable = true))) 

    override def userClass: Class[A] = classOf[A] 

    override def serialize(obj: Any): Any = 
    obj match { 
     case A(list) => 
     val row = new GenericMutableRow(1) 
     val elements = 
      list.map(_.asInstanceOf[Any]) 
       .map(e => BUDT.serialize(e)) 
       .toArray 
     row.update(0, new GenericArrayData(elements)) 
     row 
    } 

    override def deserialize(datum: Any): A = { 
    datum match { 
     case row: InternalRow => 
     val first = row.getArray(0) 
     val bs:Array[InternalRow] = first.toArray(BUDT.sqlType) 
     val bseq = bs.toSeq.map(e => BUDT.deserialize(e)) 
     val a = new A(bseq) 
     a 
    } 
    } 

} 

Tất cả bốn ch anges có cùng một ký tự: mối quan hệ giữa việc xử lý A s và xử lý B s giờ đây rất rõ ràng: đối với việc nhập giản đồ, cho tuần tự hóa và de-serialization. Mã ban đầu dường như dựa trên giả định rằng Spark SQL sẽ "chỉ ra nó", có thể là hợp lý, nhưng dường như nó không.

+0

Đã hiệu quả. Tôi đang sử dụng các đối tượng "phức tạp" như các cột trong một DataFrame và trong Spark 1.6.0 nó ngừng hoạt động. Điều này đã làm các trick, vì vậy bài học tôi đã học được chỉ là làm cho tất cả mọi thứ liên quan đến serialization/de-serialization rất rõ ràng. Chúc mừng! –

Các vấn đề liên quan