2016-05-04 18 views
9

Tôi là người học Apache Spark và đã gặp một hành động RDDaggregate mà tôi không biết đầu mối của nó hoạt động như thế nào. một số ai có thể giải thích rõ ràng và giải thích trong bước chi tiết từng bước làm thế nào chúng ta đi đến kết quả dưới đây để biết mã ở đâyRDD tổng hợp trong tia lửa

RDD input = {1,2,3,3} 

RDD Aggregate function : 

rdd.aggregate((0, 0)) 
((x, y) => 
(x._1 + y, x._2 + 1), 
(x, y) => 
(x._1 + y._1, x._2 + y._2)) 

output : {9,4} 

Cảm ơn

Trả lời

18

Nếu bạn không chắc chắn những gì đang xảy ra cách tốt nhất là làm theo Các loại. Bỏ ngầm ClassTag cho ngắn gọn, chúng tôi bắt đầu với một cái gì đó như thế này

abstract class RDD[T] extends Serializable with Logging 

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U 

Nếu bạn bỏ qua tất cả các thông số bổ sung mà bạn sẽ thấy rằng aggregate là một chức năng mà bản đồ RDD[T]-U. Nó có nghĩa là loại giá trị trong đầu vào RDD không phải giống như loại giá trị đầu ra. Vì vậy, nó rõ ràng là khác với ví dụ reduce:

def reduce(func: (T, T) ⇒ T): T 

hoặc fold:

def fold(zeroValue: T)(op: (T, T) => T): T 

Giống như fold, aggregate đòi hỏi một zeroValue. Làm thế nào để chọn nó? Nó phải là một yếu tố nhận dạng (trung tính) đối với combOp.

Bạn cũng phải cung cấp hai chức năng:

  • seqOp mà bản đồ (U, T)-U
  • combOp mà bản đồ từ (U, U) để U

Chỉ cần dựa trên này chữ ký bạn nên đã thấy chỉ seqOp mới có thể truy cập dữ liệu thô. Phải mất một số giá trị loại U một loại khác T và trả về giá trị loại U. Trong trường hợp của bạn, đó là chức năng có chữ ký sau

((Int, Int), Int) => (Int, Int) 

Tại thời điểm này, bạn có thể nghi ngờ nó được sử dụng cho một số thao tác giống như nếp gấp.

Hàm thứ hai nhận hai đối số thuộc loại U và trả về giá trị loại U. Như đã nêu trước đây, rõ ràng là nó không chạm vào dữ liệu gốc và chỉ có thể hoạt động trên các giá trị đã được xử lý bởi seqOp. Trong trường hợp của bạn, hàm này có chữ ký như sau:

((Int, Int), (Int, Int)) => (Int, Int) 

Vậy làm cách nào để chúng ta có thể kết hợp tất cả chúng lại với nhau?

  1. Trước mỗi phân vùng được tổng hợp sử dụng tiêu chuẩn Iterator.aggregate với zeroValue, seqOpcombOp truyền như z, seqopcombop respectivelly.Kể từ InterruptibleIterator sử dụng trong nội không ghi đè aggregate nó phải được thực hiện như một đơn giản foldLeft(zeroValue)(seqOp)

  2. Tiếp theo kết quả một phần thu được từ mỗi phân vùng được tổng hợp sử dụng combOp

Cho phép giả định đầu vào mà RDD có ba phân vùng với sau phân phối các giá trị:

  • Iterator(1, 2)
  • Iterator(2, 3)
  • Iterator()

Bạn có thể mong đợi thực hiện điều đó, bỏ qua trật tự tuyệt đối, sẽ tương đương với một cái gì đó như thế này:

val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1) 
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2) 

Seq(Iterator(1, 2), Iterator(3, 3), Iterator()) 
    .map(_.foldLeft((0, 0))(seqOp)) 
    .reduce(combOp) 

foldLeft cho một phân vùng duy nhất có thể trông như thế này:

Iterator(1, 2).foldLeft((0, 0))(seqOp) 
Iterator(2).foldLeft((1, 1))(seqOp) 
(3, 2) 

và trên tất cả các phân vùng

Seq((3,2), (6,2), (0,0)) 

mà kết hợp sẽ cho bạn quan sát kết quả:

(3 + 6 + 0, 2 + 2 + 0) 
(9, 4) 

Nói chung đây là một mô hình phổ biến bạn sẽ tìm thấy trên khắp Spark nơi bạn vượt qua giá trị trung lập, một hàm sử dụng để xử lý các giá trị cho mỗi phân vùng và một chức năng được sử dụng để hợp nhất một phần cốt liệu từ các phân vùng khác nhau. Một số ví dụ khác bao gồm:

  • aggregateByKey
  • User Defined Functions tổng hợp
  • Aggregators trên Spark Datasets.
1

Đây là hiểu biết của tôi để bạn tham khảo:

Hãy tưởng tượng bạn có hai nút, một mất đầu vào của hai yếu tố danh sách đầu tiên {1,2}, và một người khác mất {3, 3}. (Các phân vùng ở đây là chỉ cho thuận tiện)

Tại nút đầu tiên: "(x, y) => (x._1 + y, x._2 + 1)", x đầu tiên là (0 , 0) như đã cho, và y là phần tử đầu tiên của bạn 1, và bạn sẽ có đầu ra (0 + 1, 0 + 1), sau đó đến phần tử thứ hai của bạn y = 2 và đầu ra (1 + 2, 1 + 1), đó là (3, 2)

Tại nút thứ hai, thủ tục tương tự xảy ra song song và bạn sẽ có (6, 2).

"(x, y) => (x._1 + y._1, x._2 + y._2)", yêu cầu bạn hợp nhất hai nút và bạn sẽ nhận được (9,4)


một điều đáng chú ý là (0,0) thực sự được thêm vào kết quả chiều dài (rdd) +1 lần.

"scala> rdd.aggregate ((1,1)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x. _1 + y._1, x._2 + y._2)) res1: (Int, Int) = (14,9) "

Các vấn đề liên quan