Nếu bạn không chắc chắn những gì đang xảy ra cách tốt nhất là làm theo Các loại. Bỏ ngầm ClassTag
cho ngắn gọn, chúng tôi bắt đầu với một cái gì đó như thế này
abstract class RDD[T] extends Serializable with Logging
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
Nếu bạn bỏ qua tất cả các thông số bổ sung mà bạn sẽ thấy rằng aggregate
là một chức năng mà bản đồ RDD[T]
-U
. Nó có nghĩa là loại giá trị trong đầu vào RDD
không phải giống như loại giá trị đầu ra. Vì vậy, nó rõ ràng là khác với ví dụ reduce
:
def reduce(func: (T, T) ⇒ T): T
hoặc fold
:
def fold(zeroValue: T)(op: (T, T) => T): T
Giống như fold
, aggregate
đòi hỏi một zeroValue
. Làm thế nào để chọn nó? Nó phải là một yếu tố nhận dạng (trung tính) đối với combOp
.
Bạn cũng phải cung cấp hai chức năng:
seqOp
mà bản đồ (U, T)
-U
combOp
mà bản đồ từ (U, U)
để U
Chỉ cần dựa trên này chữ ký bạn nên đã thấy chỉ seqOp
mới có thể truy cập dữ liệu thô. Phải mất một số giá trị loại U
một loại khác T
và trả về giá trị loại U
. Trong trường hợp của bạn, đó là chức năng có chữ ký sau
((Int, Int), Int) => (Int, Int)
Tại thời điểm này, bạn có thể nghi ngờ nó được sử dụng cho một số thao tác giống như nếp gấp.
Hàm thứ hai nhận hai đối số thuộc loại U
và trả về giá trị loại U
. Như đã nêu trước đây, rõ ràng là nó không chạm vào dữ liệu gốc và chỉ có thể hoạt động trên các giá trị đã được xử lý bởi seqOp
. Trong trường hợp của bạn, hàm này có chữ ký như sau:
((Int, Int), (Int, Int)) => (Int, Int)
Vậy làm cách nào để chúng ta có thể kết hợp tất cả chúng lại với nhau?
Trước mỗi phân vùng được tổng hợp sử dụng tiêu chuẩn Iterator.aggregate
với zeroValue
, seqOp
và combOp
truyền như z
, seqop
và combop
respectivelly.Kể từ InterruptibleIterator
sử dụng trong nội không ghi đè aggregate
nó phải được thực hiện như một đơn giản foldLeft(zeroValue)(seqOp)
Tiếp theo kết quả một phần thu được từ mỗi phân vùng được tổng hợp sử dụng combOp
Cho phép giả định đầu vào mà RDD có ba phân vùng với sau phân phối các giá trị:
Iterator(1, 2)
Iterator(2, 3)
Iterator()
Bạn có thể mong đợi thực hiện điều đó, bỏ qua trật tự tuyệt đối, sẽ tương đương với một cái gì đó như thế này:
val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1)
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
Seq(Iterator(1, 2), Iterator(3, 3), Iterator())
.map(_.foldLeft((0, 0))(seqOp))
.reduce(combOp)
foldLeft
cho một phân vùng duy nhất có thể trông như thế này:
Iterator(1, 2).foldLeft((0, 0))(seqOp)
Iterator(2).foldLeft((1, 1))(seqOp)
(3, 2)
và trên tất cả các phân vùng
Seq((3,2), (6,2), (0,0))
mà kết hợp sẽ cho bạn quan sát kết quả:
(3 + 6 + 0, 2 + 2 + 0)
(9, 4)
Nói chung đây là một mô hình phổ biến bạn sẽ tìm thấy trên khắp Spark nơi bạn vượt qua giá trị trung lập, một hàm sử dụng để xử lý các giá trị cho mỗi phân vùng và một chức năng được sử dụng để hợp nhất một phần cốt liệu từ các phân vùng khác nhau. Một số ví dụ khác bao gồm:
aggregateByKey
- User Defined Functions tổng hợp
Aggregators
trên Spark Datasets
.