2016-11-02 16 views
6

Tôi tham gia hai bộ dữ liệu lớn bằng cách sử dụng Spark RDD, một tập dữ liệu bị lệch rất nhiều nên rất ít trong số các tác vụ thực thi mất nhiều thời gian để hoàn thành công việc, làm cách nào tôi có thể giải quyết tình huống này?Skewed dataset tham gia vào Spark?

Trả lời

0

Bạn có thể thử phân vùng lại RDD "bị lệch" thành nhiều phân vùng hơn hoặc cố gắng tăng spark.sql.shuffle.partitions (theo mặc định là 200).

Trong trường hợp của bạn, tôi sẽ cố gắng đặt số lượng phân vùng cao hơn nhiều so với số lượng người thực thi.

+1

spark.sql.shuffle.partitions sẽ không giúp dữ liệu sai lệch. Sẽ có 200 phân vùng nhưng chỉ một vài trong số chúng sẽ có dữ liệu. – Luniam

4

bài viết Khá tốt về cách nó có thể được thực hiện: https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/

phiên bản ngắn:

  • Thêm yếu tố ngẫu nhiên để RDD lớn và tạo tham gia khóa mới với nó
  • Thêm yếu tố ngẫu nhiên để RDD nhỏ sử dụng explode/flatMap để tăng số lượng mục nhập và tạo khóa kết hợp mới
  • Tham gia RDD trên khóa kết nối mới sẽ được phân phối tốt hơn do gieo ngẫu nhiên
2

Tùy thuộc vào loại skew cụ thể mà bạn đang gặp phải, có thể có nhiều cách khác nhau để giải quyết nó. Ý tưởng cơ bản là:

  • Sửa đổi cột của bạn tham gia, hoặc tạo ra một tham gia cột mới, đó không phải là sai lệch nhưng mà vẫn giữ được đầy đủ thông tin để thực hiện tham gia
  • Làm tham gia vào đó cột không sai lệch - - kết quả là phân vùng sẽ không được lệch
  • sau khi tham gia, bạn có thể cập nhật các cột tham gia trở lại định dạng ưa thích của bạn, hoặc thả nó nếu bạn đã tạo một cột mới

các "chiến đấu với skew Trong Spark" bài viết tham chiếu trong câu trả lời của LiMuBei là một kỹ thuật tốt nếu các dữ liệu bị xáo trộn ipates trong tham gia. Trong trường hợp của tôi, skew được gây ra bởi một số lượng rất lớn các giá trị null trong cột kết nối. Các giá trị null không tham gia vào phép nối, nhưng vì các phân vùng Spark trên cột kết nối, các phân vùng sau tham gia rất sai lệch khi có một phân vùng khổng lồ chứa tất cả các giá trị rỗng.

Tôi đã giải quyết nó bằng cách thêm một cột mới đã thay đổi tất cả giá trị null thành giá trị tạm thời được phân phối tốt, chẳng hạn như "NULL_VALUE_X", trong đó X được thay thế bằng số ngẫu nhiên giữa 1 và 10.000, ví dụ: (Trong Java):

// Before the join, create a join column with well-distributed temporary values for null swids. This column 
// will be dropped after the join. We need to do this so the post-join partitions will be well-distributed, 
// and not have a giant partition with all null swids. 
String swidWithDistributedNulls = "swid_with_distributed_nulls"; 
int numNullValues = 10000; // Just use a number that will always be bigger than number of partitions 
Column swidWithDistributedNullsCol = 
    when(csDataset.col(CS_COL_SWID).isNull(), functions.concat(
     functions.lit("NULL_SWID_"), 
     functions.round(functions.rand().multiply(numNullValues))) 
    ) 
    .otherwise(csDataset.col(CS_COL_SWID)); 
csDataset = csDataset.withColumn(swidWithDistributedNulls, swidWithDistributedNullsCol); 

Sau đó tham gia vào cột mới này, và sau đó sau khi tham gia:

outputDataset.drop(swidWithDistributedNullsCol); 
Các vấn đề liên quan