2015-01-27 14 views
5

Tôi đang sử dụng tia lửa có scala và tôi có RDD đầy đủ tuple2 chứa một đối tượng phức tạp làm khóa và tăng gấp đôi. Mục đích là để thêm đôi (tần số) nếu đối tượng là giống hệt nhau.reduceByKey sử dụng đối tượng Scala làm khóa

cho rằng tôi đã xác định đối tượng của tôi như sau:

case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Ordered[SimpleCoocurrence]{ 
     def compare(that: SimpleCoocurrence) = { 
     if(this.word.equals(that.word)&&this.word_pos.equals(that.word_pos) 
      &&this.cooc.equals(that.cooc)&&this.cooc_pos.equals(that.cooc_pos)) 
      0 
     else 
      this.toString.compareTo(that.toString) 
     } 
    } 

bây giờ tôi đang cố gắng sử dụng reduceBykey như thế:

val coocRDD = sc.parallelize(coocList) 
println(coocRDD.count) 
coocRDD.map(tup=>tup).reduceByKey(_+_) 
println(coocRDD.count) 

Nhưng, kết quả cho thấy rằng RDD trước và sau khi xử lý một reducebykey chứa chính xác cùng một số yếu tố.

Tôi làm cách nào để thực hiện reduceByKey bằng tuple2 [SimpleCoocurrence, Double]? Có phải thực hiện đặc tính được sắp xếp theo cách tốt để cho Spark biết cách so sánh các đối tượng của tôi không? Tôi có nên sử dụng chỉ tuple2 [String, Double]?

thx,

Trả lời

5

reduceByKey không sử dụng Thứ tự nhưng hashCodeequals để xác định những phím đều giống nhau. Cụ thể, hashPartitioner sẽ nhóm các khóa bằng các khóa băm, khóa sothat với cùng một hashCode rơi trên cùng một phân vùng để tránh việc giảm thêm có thể xảy ra trên mỗi phân vùng.

các lớp chữ thường có triển khai mặc định là equalshashCode. Có lẽ dữ liệu thử nghiệm được sử dụng có các giá trị khác nhau của trường distance:Double làm cho mỗi trường hợp là một đối tượng duy nhất. Sử dụng nó như là chìa khóa sẽ dẫn đến chỉ các đối tượng giống hệt nhau được giảm như một.

Một cách để giải quyết vấn đề này sẽ được xác định một chìa khóa cho case class và một phương pháp bổ sung cho các đối tượng, một cái gì đó như thế này:

case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Serializable { 
    val key = word + word_pos + cooc + cooc_pos 
} 
object SimpleCoocurrence { 
    val add: (SimpleCoocurrence, SimpleCoocurrence) => SimpleCoocurrence = ??? 
} 

val coocList:List[SimpleCoocurrence] = ??? 
val coocRDD = sc.parallelize(coocList) 
val coocByKey = coocRDD.keyBy(_.key) 
val addedCooc = coocByKey.reduceByKey(SimpleCoocurrence.add) 

(*) mã được cung cấp như hướng dẫn dụ - không biên soạn hoặc thử nghiệm .

+0

https://issues.apache.org/jira/browse/SPARK-10493 – yanghaogn

0

Đầu tiên, tôi câm ...

Tiếp theo, trong trường hợp ai có cùng một vấn đề và muốn sử dụng đối tượng scala phức tạp như chính cho một reduceByKey trên Spark:

Spark biết làm thế nào để so sánh hai đối tượng ngay cả khi họ không thực hiện Ordered. Vì vậy, đoạn mã trên thực sự là fonctionnal.

Vấn đề duy nhất là ... rằng tôi đã in cùng một RDD trước và sau. Khi tôi viết điều này, nó thực sự hoạt động tốt.

val coocRDD = sc.parallelize(coocList) 
println(coocRDD.count) 
val newRDD = coocRDD.map(tup=>tup).reduceByKey(_+_) 
println(newRDD.count) 
0

Bạn không lưu trữ kết quả của reduceByKey. Hãy thử thay thế này:

val coocRDD = sc.parallelize(coocList) 
println(coocRDD.count) 
val result = coocRDD.map(tup=>tup).reduceByKey(_+_) 
println(result.count) 
Các vấn đề liên quan