Tôi muốn lưu (dưới dạng tệp lát gỗ) một Khung dữ liệu Spark chứa một lớp tùy chỉnh làm cột. Lớp này được tạo bởi một Seq của một lớp tùy chỉnh khác. Để làm như vậy, tôi tạo một lớp UserDefinedType cho từng lớp này, theo cách tương tự với VectorUDT. Tôi có thể làm việc với các dataframe như tôi dự định nhưng không thể lưu nó vào đĩa như một sàn gỗ (hoặc jason) Tôi báo cáo nó như là một lỗi, nhưng có thể có một vấn đề với mã của tôi. Tôi đã thực hiện một ví dụ đơn giản để hiển thị các vấn đề:Lưu các khung dữ liệu Spark với các kiểu dữ liệu người dùng lồng nhau
import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._
@SQLUserDefinedType(udt = classOf[AUDT])
case class A(list:Seq[B])
class AUDT extends UserDefinedType[A] {
override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true)))
override def userClass: Class[A] = classOf[A]
override def serialize(obj: Any): Any = obj match {
case A(list) =>
val row = new GenericMutableRow(1)
row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
row
}
override def deserialize(datum: Any): A = {
datum match {
case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
}
}
}
object AUDT extends AUDT
@SQLUserDefinedType(udt = classOf[BUDT])
case class B(num:Int)
class BUDT extends UserDefinedType[B] {
override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false)))
override def userClass: Class[B] = classOf[B]
override def serialize(obj: Any): Any = obj match {
case B(num) =>
val row = new GenericMutableRow(1)
row.setInt(0, num)
row
}
override def deserialize(datum: Any): B = {
datum match {
case row: InternalRow => new B(row.getInt(0))
}
}
}
object BUDT extends BUDT
object TestNested {
def main(args:Array[String]) = {
val col = Seq(new A(Seq(new B(1), new B(2))),
new A(Seq(new B(3), new B(4))))
val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize(1 to 2 zip col).toDF()
df.show()
df.write.mode(SaveMode.Overwrite).save(...)
}
}
Điều này dẫn đến các lỗi sau:
15/09/16 16:44:39 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: Nested type should be repeated: required group array { required int32 num; } at org.apache.parquet.schema.ConversionPatterns.listWrapper(ConversionPatterns.java:42) at org.apache.parquet.schema.ConversionPatterns.listType(ConversionPatterns.java:97) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:460) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:522) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:521) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:521) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:526) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:92) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.sql.types.StructType.map(StructType.scala:92) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convert(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypesConverter.scala:58) at org.apache.spark.sql.execution.datasources.parquet.RowWriteSupport.init(ParquetTableSupport.scala:55) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:272) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:234) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 16:44:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost):
Nếu một tiết kiệm dataframe với B thay vì của A không có vấn đề tồn tại từ B như không lớp tùy chỉnh lồng nhau. Tui bỏ lỡ điều gì vậy?
Đã hiệu quả. Tôi đang sử dụng các đối tượng "phức tạp" như các cột trong một DataFrame và trong Spark 1.6.0 nó ngừng hoạt động. Điều này đã làm các trick, vì vậy bài học tôi đã học được chỉ là làm cho tất cả mọi thứ liên quan đến serialization/de-serialization rất rõ ràng. Chúc mừng! –