2015-06-26 18 views
11

Apache Spark pyspark.RDD Tài liệu API đề cập rằng groupByKey() không hiệu quả. Thay vào đó, bạn nên sử dụng reduceByKey(), aggregateByKey(), combineByKey() hoặc foldByKey() thay thế. Điều này sẽ dẫn đến làm một số tập hợp trong các công nhân trước khi trộn, do đó làm giảm xáo trộn dữ liệu trên công nhân.Apache Spark: Việc triển khai tương đương RDD.groupByKey() bằng cách sử dụng RDD.aggregateByKey() là gì?

Đưa ra tập dữ liệu sau và biểu thức groupByKey(), triển khai tương đương và hiệu quả (giảm xáo trộn dữ liệu công nhân chéo) không sử dụng groupByKey(), nhưng mang lại kết quả tương tự?

dataset = [("a", 7), ("b", 3), ("a", 8)] 
rdd = (sc.parallelize(dataset) 
     .groupByKey()) 
print sorted(rdd.mapValues(list).collect()) 

Output:

[('a', [7, 8]), ('b', [3])] 
+0

Dữ liệu của bạn có được phân đoạn ngẫu nhiên hay bằng khóa không? Nếu bạn có thể đảm bảo rằng tất cả các bản ghi có a._1 = "a" nằm trên cùng một phân vùng, bạn có thể tăng tốc độ đáng kể - bạn có thể thoát khỏi mà không cần bất kỳ dấu hiệu nào, ngoài các phân đoạn cần thiết cho phân vùng ban đầu . Có thể thử sử dụng một trình phân vùng băm? –

Trả lời

18

As far as Tôi có thể nói không có gì để đạt được * trong trường hợp cụ thể này bằng cách sử dụng aggregateByKey hoặc một chức năng tương tự. Vì bạn đang xây dựng một danh sách, không có giảm "thực" và lượng dữ liệu phải được xáo trộn nhiều hay ít.

Để thực sự quan sát một số hiệu suất, bạn cần chuyển đổi thực sự làm giảm lượng dữ liệu được chuyển cho ví dụ, tính toán thống kê tóm tắt, tìm các yếu tố độc đáo.

Về lợi ích khác nhau của việc sử dụng reduceByKey(), combineByKey() hoặc foldByKey() có sự khác biệt khái niệm quan trọng dễ thấy hơn khi bạn xem xét chữ ký Scala API.

Cả hai reduceByKeyfoldByKey bản đồ từ RDD[(K, V)] đến RDD[(K, V)] trong khi thứ hai cung cấp phần tử bổ sung bằng không.

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] 
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)] 

combineByKey (không có aggregateByKey, nhưng nó là cùng một loại chuyển đổi) biến RDD[(K, V)]-RDD[(K, C)]:

combineByKey[C](
    createCombiner: (V) ⇒ C, 
    mergeValue: (C, V) ⇒ C, 
    mergeCombiners: (C, C) ⇒ C): RDD[(K, C)] 

Trở lại với ví dụ của bạn chỉ combineByKey (và trong PySpark aggregateByKey) là thực sự áp dụng vì bạn đang chuyển đổi từ RDD[(String, Int)] thành RDD[(String, List[Int])].

Trong thời gian ở một ngôn ngữ động như Python nó thực sự có thể thực hiện như một thao tác dùng foldByKey hoặc reduceByKey nó làm cho ngữ nghĩa của mã không rõ ràng và trích dẫn @ tim-peters "Nên có cùng-- và tốt nhất là chỉ có một - cách rõ ràng để làm điều đó "[1].

Sự khác nhau giữa aggregateByKeycombineByKey là khá nhiều giống như giữa reduceByKeyfoldByKey như vậy cho một danh sách đó chủ yếu là một vấn đề của hương vị:

def merge_value(acc, x): 
    acc.append(x) 
    return acc 

def merge_combiners(acc1, acc2): 
    acc1.extend(acc2) 
    return acc1 

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
    .combineByKey(
     lambda x: [x], 
     lambda u, v: u + [v], 
     lambda u1,u2: u1+u2)) 

Trong thực tế, bạn nên thích groupByKey mặc dù. Việc triển khai PySpark được tối ưu hóa một cách đáng kể so với việc triển khai ngây thơ như được thực hiện ở trên.

1.Peters, T. PEP 20 - Zen of Python. (2004). tại https://www.python.org/dev/peps/pep-0020/


* Trong thực tế, thực sự có rất nhiều điều cần giải phóng ở đây, đặc biệt khi sử dụng PySpark. Việc triển khai Python của groupByKey được tối ưu hóa đáng kể hơn so với kết hợp ngây thơ bằng khóa. Bạn có thể kiểm tra Be Smart About groupByKey, được tạo bởi tôi và @eliasah để thảo luận thêm.

+0

Nếu bạn sử dụng một trình phân vùng (ví dụ, phân vùng bằng một mã băm của khóa), bạn có thể lấy đi mà không cần bất kỳ dấu xáo trộn nào khác không? –

+0

@GlennStrycker Theo như tôi biết câu trả lời là tích cực. Nếu RDD được phân đoạn bằng khóa thì tất cả các giá trị cho một khóa nhất định phải được xử lý cục bộ trên một nút duy nhất. Vấn đề có thể xảy ra là sự phân phối các phím sai lệch. – zero323

3

Đây là một lựa chọn có sử dụng aggregateByKey(). Tôi rất tò mò muốn biết điều này có thể được thực hiện bằng cách sử dụng reduceByKey(), combineByKey() hoặc foldByKey() và chi phí/lợi ích nào cho mỗi giải pháp thay thế.

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
     .aggregateByKey(list(), 
         lambda u,v: u+[v], 
         lambda u1,u2: u1+u2)) 
print sorted(rdd.mapValues(list).collect()) 

Output:

[('a', [7, 8]), ('b', [3])] 

Sau đây là một ký ức nhẹ hơn thực hiện hiệu quả, mặc dù ít có thể đọc được để những người mới bắt trăn, trong đó sản xuất cùng một sản lượng:

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
     .aggregateByKey(list(), 
         lambda u,v: itertools.chain(u,[v]), 
         lambda u1,u2: itertools.chain(u1,u2))) 
print sorted(rdd.mapValues(list).collect()) 
Các vấn đề liên quan