2015-02-09 13 views
7

Tôi đã triển khai giải pháp nhóm RDD[K, V] theo khóa và tính toán dữ liệu theo từng nhóm (K, RDD[V]), sử dụng partitionByPartitioner. Tuy nhiên, tôi không chắc liệu nó có thực sự hiệu quả hay không và tôi muốn có quan điểm của bạn.Sử dụng PartitionBy để chia và tính toán hiệu quả các nhóm RDD bằng phím

Dưới đây là một trường hợp mẫu: theo một danh sách các [K: Int, V: Int], tính V s có nghĩa là cho từng nhóm K, biết rằng nó nên được phân phối và rằng V giá trị có thể rất lớn. Điều đó sẽ cung cấp cho:

List[K, V] => (K, mean(V)) 

Lớp phân vùng đơn giản:

class MyPartitioner(maxKey: Int) extends Partitioner { 

    def numPartitions = maxKey 

    def getPartition(key: Any): Int = key match { 
     case i: Int if i < maxKey => i 
    } 
    } 

Mã phân vùng:

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7)) 

     val rdd = sc.parallelize(l) 
     val p = rdd.partitionBy(new MyPartitioner(4)).cache() 

     p.foreachPartition(x => { 
     try { 
      val r = sc.parallelize(x.toList) 
      val id = r.first() //get the K partition id 
      val v = r.map(x => x._2) 
      println(id._1 + "->" + mean(v)) 
     } catch { 
      case e: UnsupportedOperationException => 0 
     } 
     }) 

Đầu ra là:

1->13, 2->4, 3->7

Câu hỏi của tôi là:

  1. điều gì thực sự xảy ra khi gọi partitionBy? (xin lỗi, tôi không tìm đủ thông số kỹ thuật)
  2. Thực sự hiệu quả khi lập bản đồ theo phân vùng, biết rằng trong trường hợp sản xuất của tôi sẽ không có quá nhiều khóa (như 50 đối với mẫu) bởi nhiều giá trị (như 1 triệu cho mẫu)
  3. Chi phí của paralellize(x.toList) là bao nhiêu? Có nhất quán không? (Tôi cần một số RDD ở đầu vào của mean())
  4. Bạn tự làm điều đó như thế nào?

Trân

Trả lời

4

Mã của bạn không nên làm việc. Bạn không thể chuyển đối tượng SparkContext cho các người thi hành. (Nó không phải là Serializable.) Ngoài ra tôi không thấy lý do tại sao bạn cần.

Để tính giá trị trung bình, bạn cần tính tổng và số lượng và tính tỷ lệ của chúng. Trình phân vùng mặc định sẽ hoạt động tốt.

def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = { 
    case class SumCount(sum: Double, count: Double) 
    val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
    (sc, v) => SumCount(sc.sum + v, sc.count + 1.0), 
    (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count)) 
    sumCounts.map(sc => sc.sum/sc.count) 
} 

Đây là tính toán một lần hiệu quả tổng quát tốt.

+0

cảm ơn câu trả lời của bạn, tất nhiên nó không thể hoạt động, tôi không có tất cả các phản xạ của các thủ thuật mã hóa tia lửa và tôi đã bị hư hỏng bởi jvm địa phương của tôi. Tuy nhiên, trên thực tế, tôi không cần phải tính giá trị trung bình, mà là một phương pháp ml phức tạp, và tôi cần một RDD [Vector]. Làm thế nào tôi có thể nhận được một danh sách (key, RDD [Vector]) từ một RDD duy nhất [Int, Int]? Tôi không tìm được giải pháp. – Seb

+0

Tôi nghĩ rằng đây là một chủ đề tương tự sau đó: http://stackoverflow.com/questions/28166190/spark-column-wise-word-count/28199302#28199302 Tôi không chắc chắn làm thế nào bạn muốn làm cho 'Vector' từ 'Int' s. Nhưng nếu bạn muốn lấy một RDD cho mỗi khóa, bạn cần phải phân tách RDD gốc và điều này được thảo luận trong câu trả lời được liên kết. Nếu nó không cho bạn câu trả lời, tôi đề nghị hỏi một câu hỏi khác, có lẽ với một lời giải thích rõ ràng, cao cấp về những gì bạn muốn làm. –

Các vấn đề liên quan