2016-02-05 14 views
5

Dưới đây là mã mẫu mà tôi đang chạy. khi công việc spark này chạy, các kết nối Dataframe đang xảy ra bằng cách sử dụng sortmergejoin thay vì broadcastjoin.Phát sóng không xảy ra khi tham gia các khung dữ liệu trong Spark 1.6

def joinedDf (sqlContext: SQLContext, 
       txnTable: DataFrame, 
       countriesDfBroadcast: Broadcast[DataFrame]): 
       DataFrame = { 
        txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"), 
        $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner") 
       } 
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp") 

Broadcastjoin không xảy ra ngay cả khi tôi chỉ định gợi ý broadcast() trong câu lệnh nối.

Trình tối ưu hóa là hashpartitioning khung dữ liệu và nó đang khiến dữ liệu bị lệch.

Có ai đã xem hành vi này không?

Tôi đang chạy trên sợi bằng cách sử dụng Spark 1.6 và HiveContext dưới dạng SQLContext. Công việc tia lửa chạy trên 200 người thực thi. và kích thước dữ liệu của txnTable là 240GB và dữ liệu của các quốc giaDf là 5mb.

Trả lời

7

Cả cách bạn phát sóng DataFrame và cách bạn truy cập không chính xác.

  • Không thể sử dụng chương trình phát sóng chuẩn để xử lý cấu trúc dữ liệu phân tán. Nếu bạn muốn thực hiện phát sóng tham gia vào một DataFrame bạn nên sử dụng broadcast chức năng đánh dấu cho DataFrame để phát sóng:

    import org.apache.spark.sql.functions.broadcast 
    
    val countriesDf: DataFrame = ??? 
    val tmp: DataFrame = broadcast(
        countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries") 
    ) 
    
    txnTable.as("df1").join(
        broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner") 
    

    Bên trong nó sẽ collecttmp mà không chuyển đổi từ nội bộ và phát sóng sau đó.

  • đối số tham gia được đánh giá háo hức. Ngay cả khi có thể sử dụng SparkContext.broadcast với giá trị phát sóng cấu trúc dữ liệu phân tán được đánh giá cục bộ trước khi gọi join. Thats 'lý do tại sao chức năng của bạn làm việc ở tất cả nhưng không thực hiện tham gia phát sóng.

+0

Bây giờ, tôi thấy BroadcastHashJoin trong một lần chạy và SortMergeJoin trong một lần chạy khác. (cùng một mã, bộ dữ liệu khác nhau). –

+0

Đoán của tôi là vượt quá ngưỡng kích thước cho các kết nối phát sóng. – zero323

+0

Tôi có một spark.sql.autoBroadcastJoinThreshold rất cao. Xấp xỉ. 1GB. Và tệp được phát là khoảng 5 MB. Tuy nhiên, trong các hoạt động khác, đề xuất trên hoạt động rất tốt. –

Các vấn đề liên quan