2017-06-10 19 views
5

Tôi có RDD với các phần tử của các loại khác nhau và tôi muốn đếm chúng theo loại của chúng, ví dụ: mã bên dưới sẽ hoạt động chính xác.Bộ lọc reck tia lửa theo lớp phần tử

scala> val rdd = sc.parallelize(List(1, 2.0, "abc")) 
rdd: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[0] at parallelize at <console>:24 

scala> rdd.filter{case z:Int => true; case _ => false}.count 
res0: Long = 1 

scala> rdd.filter{case z:String => true; case _ => false}.count 
res1: Long = 1 

Bây giờ nếu các yếu tố thuộc loại do người dùng xác định, mã bên dưới sẽ không hoạt động như mong đợi.

scala> class TypeA extends Serializable    // this is the base class 
defined class TypeA 

scala> case class TypeB(id:Long) extends TypeA  // derived class 1 
defined class TypeB 

scala> case class TypeC(name:String) extends TypeA // derived class 2 
defined class TypeC 

scala> val rdd1 = sc.parallelize(List(TypeB(123), TypeC("jack"), TypeB(456))) // create an rdd with different types of elements 
rdd1: org.apache.spark.rdd.RDD[TypeA with Product] = ParallelCollectionRDD[3] at parallelize at <console>:29 

scala> rdd1.count    // total size is correct 
res2: Long = 3 

scala> rdd1.filter{case z:TypeB => true; case _ => false}.count // what the hell? 
res3: Long = 0 

scala> rdd1.filter{case z:TypeC => true; case _ => false}.count // again ? 
res4: Long = 0 

scala> rdd1.filter{case z:TypeA => true; case _ => false}.count // only works for the base class? 
res5: Long = 3 

Tôi có bỏ lỡ bất cứ điều gì ở đây không? Xin vui lòng giúp đỡ!

Trả lời

3

Điều này có vẻ giống như biến thể của Spark-1199 và có khả năng là lỗi REPL.

hành vi dự kiến ​​sản lượng này khi chạy cục bộ bên trong IDEA:

import org.apache.spark.SparkContext 

class TypeA extends Serializable 
case class TypeB(id:Long) extends TypeA 
case class TypeC(name:String) extends TypeA 

val sc = new SparkContext("local[*]", "swe") 
val rdd = sc.parallelize(List(TypeB(12), TypeC("Hsa"))) 

rdd.filter { case x: TypeB => true; case _ => false }.count() 

Sản lượng:

import org.apache.spark.SparkContext 

defined class TypeA 
defined class TypeB 
defined class TypeC 

sc: org.apache.spark.SparkContext = [email protected] 
rdd: org.apache.spark.rdd.RDD[TypeA with Product] = ParallelCollectionRDD[0] at parallelize at <console>:18 

[Stage 0:>....... (0 + 0)/4] 
res0: Long = 1 
+1

Thật vậy, oops !! Cảm ơn bạn đã thử trên Spark env của bạn :-) – avocado

Các vấn đề liên quan