Tôi là người mới tham gia Apache Spark và đang học các chức năng cơ bản. Đã có một nghi ngờ nhỏ. Giả sử tôi có RDD bộ dữ liệu (khóa, giá trị) và muốn lấy một số thứ duy nhất trong số chúng. Tôi sử dụng hàm distinct(). Tôi tự hỏi những gì cơ sở nào chức năng xem xét rằng tuples là khác nhau ..? Là nó dựa trên các phím, hoặc các giá trị, hoặc cả hai?Chức năng Distinct() hoạt động như thế nào trong Spark?
Trả lời
Các tài liệu API cho RDD.distinct() chỉ cung cấp một mô tả một câu: "Quay trở lại một RDD mới có chứa các yếu tố khác biệt trong RDD này"
Từ kinh nghiệm gần đây tôi có thể cho bạn biết rằng trong một bộ tuple-RDD, toàn bộ bộ dữ liệu được xem xét.
Nếu bạn muốn phím riêng biệt hoặc giá trị khác biệt, sau đó tùy thuộc vào chính xác những gì bạn muốn đạt được, bạn có thể:
A. gọi groupByKey()
chuyển {(k1,v11),(k1,v12),(k2,v21),(k2,v22)}
để {(k1,[v11,v12]), (k2,[v21,v22])}
; hoặc
B. loại bỏ một trong hai phím hoặc các giá trị bằng cách gọi keys()
hoặc values()
Tiếp theo distinct()
Trong bài viết này (tháng 6 năm 2015) UC Berkeley + edx đang chạy một khóa học trực tuyến miễn phí Introduction to Big Data and Apache Spark mà sẽ cung cấp tay trên thực hành với các chức năng này.
distinct
sử dụng phương pháp hashCode
và equals
của đối tượng để xác định này. Tuples được xây dựng trong với các cơ chế bình đẳng ủy thác xuống vào bình đẳng và vị trí của từng đối tượng. Vì vậy, distinct
sẽ hoạt động với toàn bộ đối tượng Tuple2
. Như Paul đã chỉ ra, bạn có thể gọi keys
hoặc values
và sau đó distinct
. Hoặc bạn có thể viết các giá trị riêng biệt của riêng mình qua aggregateByKey
, điều này sẽ giữ cho cặp khóa chính. Hoặc nếu bạn muốn các khóa riêng biệt, thì bạn có thể sử dụng một số thông thường là aggregate
Cảm ơn bạn! Có ý nghĩa. –
.distinct() chắc chắn đang thực hiện trộn trên các phân vùng. Để xem thêm về những gì đang xảy ra, hãy chạy một .toDebugString trên RDD của bạn.
val hashPart = new HashPartitioner(<number of partitions>)
val myRDDPreStep = <load some RDD>
val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)
mà cho một ví dụ RDD Tôi có (myRDDPreStep đã được băm-chia theo mã, vẫn kiên trì bởi StorageLevel.MEMORY_AND_DISK_SER, và checkpointed), trả về:
(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
| CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
| myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]
Lưu ý rằng có thể có hiệu quả hơn cách để có được một khác biệt mà liên quan đến ít shuffles, đặc biệt nếu RDD của bạn đã được phân vùng một cách thông minh và các phân vùng không quá sai lệch.
Xem Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct? và Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?
Dường như distinct
sẽ thoát khỏi (key, value) bản sao.
Trong ví dụ dưới đây (1,20) và (2,20) được lặp lại hai lần trong myRDD
, nhưng sau distinct()
, các bản sao sẽ bị xóa.
scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22
scala> myRDD.collect().foreach(println _)
(1,20)
(1,21)
(1,20)
(2,20)
(2,22)
(2,20)
(3,21)
(3,22)
scala> myRDD.distinct.collect().foreach(println _)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)
Justin Pihony là đúng.Distinct sử dụng phương thức hashCode và bằng của đối tượng để xác định này. lợi nhuận của nó các yếu tố riêng biệt (đối tượng)
val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
biệt
rdd.distinct.collect().foreach(println)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)
Nếu bạn muốn áp dụng riêng biệt trên chìa khóa. Trong trường hợp đó giảm là lựa chọn tốt hơn
ReduceBy
val reduceRDD= rdd.map(tup =>
(tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2)
reduceRDD.collect().foreach(println)
Output: -
(2,20)
(1,20)
(3,21)
- 1. Chức năng scanf hoạt động như thế nào trong C?
- 2. Điều khoản DISTINCT của SQL hoạt động như thế nào?
- 3. Chức năng tf.nn.embedding_lookup hoạt động như thế nào?
- 4. Chức năng C cụ thể hoạt động như thế nào?
- 5. Mã chức năng python này hoạt động như thế nào?
- 6. Chức năng này hoạt động như thế nào?
- 7. Chức năng GWT sinkEvent hoạt động như thế nào?
- 8. như thế nào "chậm trễ" chức năng này hoạt động
- 9. Chức năng jQuery pushStack hoạt động như thế nào?
- 10. Chức năng giảm hoạt động như thế nào?
- 11. Chức năng cmp_to_key của Python hoạt động như thế nào?
- 12. Chức năng chai mong đợi hoạt động như thế nào?
- 13. Định nghĩa chức năng sau hoạt động như thế nào?
- 14. Chức năng GetBytes hoạt động như thế nào?
- 15. MapA hoạt động như thế nào với một mũi tên chức năng luồng trong Haskell?
- 16. chức năng gọi lại hoạt động như thế nào trong python multiprocessing map_async
- 17. Chức năng ẩn danh "trường hợp" thực sự hoạt động như thế nào trong Scala?
- 18. Chức năng "tất cả" trong Python hoạt động như thế nào?
- 19. Chức năng thăm dò ý kiến hoạt động như thế nào trong c?
- 20. Chức năng đệ quy hoạt động bên trong 'vòng lặp for' như thế nào
- 21. 'Chức năng' và 'sử dụng' và 'mảng_filter' hoạt động như thế nào trong PHP?
- 22. chức năng phải ngắn như thế nào?
- 23. Tính năng RescueTimes chặn hoạt động như thế nào?
- 24. CHỌN DISTINCT không hoạt động
- 25. C Biến cục bộ có cùng tên với chức năng - nó hoạt động như thế nào?
- 26. Function.prototype.call.bind hoạt động như thế nào?
- 27. Python: chức năng cmp_to_key của functools hoạt động như thế nào?
- 28. Chức năng đặt hàng của Kotlin cao hơn hoạt động như thế nào?
- 29. Chức năng phản hồi javascript này hoạt động như thế nào?
- 30. f1 = lật bản đồ const. Chức năng này hoạt động như thế nào?
Hi Paul! Hãy để tôi giả sử rằng chúng tôi có một bộ dữ liệu RDD như sau: (1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22) .. vv, Tại đây bạn có thể quan sát thấy cả khóa và giá trị lặp lại trong nhiều bộ dữ liệu khác nhau. vì vậy nếu tôi áp dụng riêng biệt() trên RDD trên, kết quả sẽ là gì ..? Vui lòng dành chút thời gian. Cảm ơn bạn! Và, có, tôi đã tham gia khóa học đó trực tuyến! :) –
Tôi không có thời gian ngay bây giờ nhưng bạn có thể thiết lập RDD của riêng bạn với 'myRDD = sc.parallelize ([(1,20), (1,21), (1,20), (2,20)), (2,22), (2,20), (3,21), (3,22)]); 'Điều này thậm chí có thể hoạt động trong một trong các sổ ghi chép Lab trước đó từ khóa học Spark. Sau đó chạy 'myRDD.distinct(). Collect() để kiểm tra kết quả đầu ra ' – Paul