2015-06-20 83 views
18

Tôi là người mới tham gia Apache Spark và đang học các chức năng cơ bản. Đã có một nghi ngờ nhỏ. Giả sử tôi có RDD bộ dữ liệu (khóa, giá trị) và muốn lấy một số thứ duy nhất trong số chúng. Tôi sử dụng hàm distinct(). Tôi tự hỏi những gì cơ sở nào chức năng xem xét rằng tuples là khác nhau ..? Là nó dựa trên các phím, hoặc các giá trị, hoặc cả hai?Chức năng Distinct() hoạt động như thế nào trong Spark?

Trả lời

8

Các tài liệu API cho RDD.distinct() chỉ cung cấp một mô tả một câu: "Quay trở lại một RDD mới có chứa các yếu tố khác biệt trong RDD này"

Từ kinh nghiệm gần đây tôi có thể cho bạn biết rằng trong một bộ tuple-RDD, toàn bộ bộ dữ liệu được xem xét.

Nếu bạn muốn phím riêng biệt hoặc giá trị khác biệt, sau đó tùy thuộc vào chính xác những gì bạn muốn đạt được, bạn có thể:

A. gọi groupByKey() chuyển {(k1,v11),(k1,v12),(k2,v21),(k2,v22)} để {(k1,[v11,v12]), (k2,[v21,v22])}; hoặc

B. loại bỏ một trong hai phím hoặc các giá trị bằng cách gọi keys() hoặc values() Tiếp theo distinct()

Trong bài viết này (tháng 6 năm 2015) UC Berkeley + edx đang chạy một khóa học trực tuyến miễn phí Introduction to Big Data and Apache Spark mà sẽ cung cấp tay trên thực hành với các chức năng này.

+0

Hi Paul! Hãy để tôi giả sử rằng chúng tôi có một bộ dữ liệu RDD như sau: (1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22) .. vv, Tại đây bạn có thể quan sát thấy cả khóa và giá trị lặp lại trong nhiều bộ dữ liệu khác nhau. vì vậy nếu tôi áp dụng riêng biệt() trên RDD trên, kết quả sẽ là gì ..? Vui lòng dành chút thời gian. Cảm ơn bạn! Và, có, tôi đã tham gia khóa học đó trực tuyến! :) –

+2

Tôi không có thời gian ngay bây giờ nhưng bạn có thể thiết lập RDD của riêng bạn với 'myRDD = sc.parallelize ([(1,20), (1,21), (1,20), (2,20)), (2,22), (2,20), (3,21), (3,22)]); 'Điều này thậm chí có thể hoạt động trong một trong các sổ ghi chép Lab trước đó từ khóa học Spark. Sau đó chạy 'myRDD.distinct(). Collect() để kiểm tra kết quả đầu ra ' – Paul

4

distinct sử dụng phương pháp hashCodeequals của đối tượng để xác định này. Tuples được xây dựng trong với các cơ chế bình đẳng ủy thác xuống vào bình đẳng và vị trí của từng đối tượng. Vì vậy, distinct sẽ hoạt động với toàn bộ đối tượng Tuple2. Như Paul đã chỉ ra, bạn có thể gọi keys hoặc values và sau đó distinct. Hoặc bạn có thể viết các giá trị riêng biệt của riêng mình qua aggregateByKey, điều này sẽ giữ cho cặp khóa chính. Hoặc nếu bạn muốn các khóa riêng biệt, thì bạn có thể sử dụng một số thông thường là aggregate

+0

Cảm ơn bạn! Có ý nghĩa. –

22

.distinct() chắc chắn đang thực hiện trộn trên các phân vùng. Để xem thêm về những gì đang xảy ra, hãy chạy một .toDebugString trên RDD của bạn.

val hashPart = new HashPartitioner(<number of partitions>) 

val myRDDPreStep = <load some RDD> 

val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER) 
myRDD.checkpoint 
println(myRDD.toDebugString) 

mà cho một ví dụ RDD Tôi có (myRDDPreStep đã được băm-chia theo mã, vẫn kiên trì bởi StorageLevel.MEMORY_AND_DISK_SER, và checkpointed), trả về:

(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated] 
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] 
    | ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] 
    +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] 
     | myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated] 
     |  CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B 
     | myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated] 

Lưu ý rằng có thể có hiệu quả hơn cách để có được một khác biệt mà liên quan đến ít shuffles, đặc biệt nếu RDD của bạn đã được phân vùng một cách thông minh và các phân vùng không quá sai lệch.

Xem Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct?Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?

1

Dường như distinct sẽ thoát khỏi (key, value) bản sao.

Trong ví dụ dưới đây (1,20) và (2,20) được lặp lại hai lần trong myRDD, nhưng sau distinct(), các bản sao sẽ bị xóa.

scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22))) 
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22 

scala> myRDD.collect().foreach(println _) 
(1,20) 
(1,21) 
(1,20) 
(2,20) 
(2,22) 
(2,20) 
(3,21) 
(3,22) 

scala> myRDD.distinct.collect().foreach(println _) 
(2,22) 
(1,20) 
(3,22) 
(2,20) 
(1,21) 
(3,21) 
6

Justin Pihony là đúng.Distinct sử dụng phương thức hashCode và bằng của đối tượng để xác định này. lợi nhuận của nó các yếu tố riêng biệt (đối tượng)

val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22))) 

biệt

rdd.distinct.collect().foreach(println) 
(2,22) 
(1,20) 
(3,22) 
(2,20) 
(1,21) 
(3,21) 

Nếu bạn muốn áp dụng riêng biệt trên chìa khóa. Trong trường hợp đó giảm là lựa chọn tốt hơn

ReduceBy

val reduceRDD= rdd.map(tup => 
    (tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2) 

reduceRDD.collect().foreach(println) 

Output: -

(2,20) 
(1,20) 
(3,21) 
Các vấn đề liên quan