2015-05-28 16 views
19

Tôi có một tệp json với một số dữ liệu, tôi có thể tạo DataFrame ra khỏi nó và giản đồ cho phần riêng biệt của nó Tôi quan tâm đến vẻ bề ngoài như sau:Cấu trúc lồng ghép Spark DataFrame có giới hạn cho lựa chọn không?

val json: DataFrame = sqlc.load("entities_with_address2.json", "json")

root 
|-- attributes: struct (nullable = true) 
| |-- Address2: array (nullable = true) 
| | |-- value: struct (nullable = true) 
| | | |-- Zip: array (nullable = true) 
| | | | |-- element: struct (containsNull = true) 
| | | | | |-- value: struct (nullable = true) 
| | | | | | |-- Zip5: array (nullable = true) 
| | | | | | | |-- element: struct (containsNull = true) 
| | | | | | | | |-- value: string (nullable = true) 

khi tôi đang cố gắng để chỉ cần chọn các lĩnh vực sâu nhất: json.select("attributes.Address2.value.Zip.value.Zip5").collect()

Nó mang lại cho tôi một ngoại lệ: org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value, StructType(StructField(Zip5, ArrayType(StructType(StructField(value, StringType, true)), true), true)), true)), true), true);

Bằng cách xem phương thức resolveGetField của LogicalPlan, tôi thấy rằng có thể chọn từ StructType hoặc từ ArrayType (StructType), nhưng có cách nào để chọn sâu hơn không? Làm thế nào tôi có thể chọn trường tôi cần?

Dưới đây là toàn bộ ngoại lệ.

org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value,StructType(StructField(Zip5,ArrayType(StructType(StructField(value,StringType,true)),true),true)),true)),true),true); 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveGetField(LogicalPlan.scala:265) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214) 
     at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 
     at scala.collection.immutable.List.foldLeft(List.scala:84) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:214) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:117) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:50) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:46) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252) 
     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:251) 
     at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108) 
     at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.immutable.List.foreach(List.scala:318) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
     at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
     at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:122) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
     at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
     at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:46) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:44) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:89) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:44) 
     at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:40) 
     at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080) 
     at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) 
     at org.apache.spark.sql.DataFrame.logicalPlanToDataFrame(DataFrame.scala:157) 
     at org.apache.spark.sql.DataFrame.select(DataFrame.scala:476) 
     at org.apache.spark.sql.DataFrame.select(DataFrame.scala:491) 
     at com.reltio.analytics.PREDF.test(PREDF.scala:55) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) 
     at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
     at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) 
     at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) 
     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) 
     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) 
     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) 
     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) 
     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) 
     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) 
     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) 
     at org.junit.runners.ParentRunner.run(ParentRunner.java:309) 
     at org.junit.runner.JUnitCore.run(JUnitCore.java:160) 
     at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74) 
     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211) 
     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67) 

Trả lời

28

Vấn đề là ArrayType - bạn có thể tái tạo lại lỗi này rất đơn giản:

val df = Seq(Tuple1(Array[String]())).toDF("users") 

Tại thời điểm đó df.printSchema show:

root 
|-- users: array (nullable = true) 
| |-- element: string (containsNull = true) 

Và bây giờ nếu bạn thử:

df.select($"users.element") 

Bạn có cùng ngoại lệ chính xác - GetField is not valid...

Bạn có một vài tùy chọn khác nhau để giải phóng Array. Bạn có thể nhận được ở từng mục với getItem như thế này:

df.select($"users".getItem(0)) 

Và kể từ getItem lợi nhuận Column khác, bạn có thể đào sâu như bạn muốn:

df.select($"attributes.Address2".getItem(0).getField("value").getField("Zip").getItem(...) 
// etc 

Nhưng với một mảng, có thể bạn muốn lập trình thư giãn toàn bộ Array. Nếu bạn nhìn vào cách Hive xử lý việc này, bạn cần phải thực hiện LATERAL VIEW. Trong Spark, bạn sẽ phải sử dụng explode để tạo ra tương đương với một Hive LATERAL VIEW:

case class User(name: String) 
df.explode($"users"){ case Row(arr: Array[String]) => arr.map(User(_)) } 

Lưu ý rằng tôi sử dụng một lớp Trường hợp trong bản đồ của tôi - đây là những gì các tài liệu có. Nếu bạn không muốn tạo ra một lớp trường hợp bạn chỉ có thể trả về một Tuple1 (hoặc Tuple2 hoặc Tuple3 vv):

df.explode($"users"){ case Row(arr: Array[String]) => arr.map(Tuple1(_)) } 
+1

David, nhờ trả lời. Rõ ràng tại sao nó không hoạt động - chỉ có thể dự án từ Struct hoặc Array (Struct) (nó thuộc lớp LogicalPlan). Tôi không muốn bỏ lỡ điều gì đó mà tôi không hiểu lắm. Mặc dù câu trả lời không phải là những gì tôi mong đợi, tôi thực sự biết ơn, vì tôi thấy một người khác, những người đã cố gắng và thất bại. Có vẻ như cách duy nhất là phát nổ, sau đó là dự án. – evgenii

Các vấn đề liên quan