2016-11-08 23 views
5

Tôi có dữ liệu đầu vào sau (trong Parquet) cho một công việc spark:Spark dataframe tham gia với phạm vi chậm

Person (millions of rows) 
+---------+----------+---------------+---------------+ 
| name | location |  start  |  end  | 
+---------+----------+---------------+---------------+ 
| Person1 |  1230 | 1478630000001 | 1478630000010 | 
| Person2 |  1230 | 1478630000002 | 1478630000012 | 
| Person2 |  1230 | 1478630000013 | 1478630000020 | 
| Person3 |  3450 | 1478630000001 | 1478630000015 | 
+---------+----------+---------------+---------------+ 


Event (millions of rows) 
+----------+----------+---------------+ 
| event | location | start_time | 
+----------+----------+---------------+ 
| Biking |  1230 | 1478630000005 | 
| Skating |  1230 | 1478630000014 | 
| Baseball |  3450 | 1478630000015 | 
+----------+----------+---------------+ 

và tôi cần phải để biến nó thành những kết quả mong đợi sau:

[{ 
    "name" : "Biking", 
    "persons" : ["Person1", "Person2"] 
}, 
{ 
    "name" : "Skating", 
    "persons" : ["Person2"] 
}, 
{ 
    "name" : "Baseball", 
    "persons" : ["Person3"] 
}] 

Bằng chữ: kết quả là danh sách của từng sự kiện, mỗi danh sách có một danh sách những người tham gia sự kiện này.

Một người đếm như người tham gia nếu

Person.start < Event.start_time 
&& Person.end > Event.start_time 
&& Person.location == Event.location 

Tôi đã thử các cách tiếp cận khác nhau, nhưng là người duy nhất mà thực sự dường như làm việc là tham gia hai dataframes và sau đó nhóm/tổng hợp chúng bằng sự kiện. Nhưng kết nối cực kỳ chậm và không phân phối tốt trên nhiều lõi CPU.

mã hiện cho Tham gia:

final DataFrame fullFrame = persons.as("persons") 
    .join(events.as("events"), col("persons.location").equalTo(col("events.location")) 
       .and(col("events.start_time").geq(col("persons.start"))) 
       .and(col("events.start_time").leq(col("persons.end"))), "inner"); 

//count to have an action 
fullFrame.count(); 

Tôi đang sử dụng độc lập Spark và Java, nếu điều này làm cho một sự khác biệt.

Có ai có ý tưởng hay hơn về cách giải quyết vấn đề này với Spark 1.6.2 không?

Trả lời

1

Kết nối phạm vi được thực hiện dưới dạng sản phẩm phụ với bước lọc tiếp theo. Một giải pháp có khả năng tốt hơn có thể là, để phát sóng bảng có khả năng nhỏ hơn events và sau đó ánh xạ bảng persons: bên trong bản đồ, kiểm tra điều kiện kết nối và tạo kết quả tương ứng.

+0

Thực tế việc sử dụng "kết nối phát sóng" đã cải thiện điều này rất nhiều, tôi phải chia bảng sự kiện thành nhiều phần nhỏ hơn phù hợp với bộ nhớ và kết hợp chúng từng người một. –

Các vấn đề liên quan