2015-05-21 16 views
6

Tôi đang sử dụng API Spark Scala. Tôi có một DataFrame Spark SQL (đọc từ một file Avro) với sơ đồ sau:Làm thế nào để sử dụng Spark SQL DataFrame với flatMap?

root 
|-- ids: array (nullable = true) 
| |-- element: map (containsNull = true) 
| | |-- key: integer 
| | |-- value: string (valueContainsNull = true) 
|-- match: array (nullable = true) 
| |-- element: integer (containsNull = true) 

Về cơ bản 2 cột [id: Danh sách [Bản đồ [Int, String]], trận đấu: Danh sách [Int]]. dữ liệu mẫu mà trông giống như:

[List(Map(1 -> a), Map(2 -> b), Map(3 -> c), Map(4 -> d)),List(0, 0, 1, 0)] 
[List(Map(5 -> c), Map(6 -> a), Map(7 -> e), Map(8 -> d)),List(1, 0, 1, 0)] 
... 

Những gì tôi muốn làm là flatMap() mỗi hàng để sản xuất 3 cột [id, tài sản, trận đấu]. Sử dụng 2 hàng trên là dữ liệu đầu vào, chúng tôi sẽ nhận được:

[1,a,0] 
[2,b,0] 
[3,c,1] 
[4,d,0] 
[5,c,1] 
[6,a,0] 
[7,e,1] 
[8,d,0] 
... 

và sau đó groupBy các Stringtài sản (ví dụ: a, b, ...) để sản xuất count("property")sum("match"):

a 2 0 
b 1 0 
c 2 2 
d 2 0 
e 1 1 

tôi muốn làm điều gì đó như:

val result = myDataFrame.select("ids","match").flatMap( 
    (row: Row) => row.getList[Map[Int,String]](1).toArray()) 
result.groupBy("property").agg(Map(
    "property" -> "count", 
    "match" -> "sum")) 

vấn đề là các flatMap chuyển đổi DataFrame thành RDD. Có cách nào tốt để thực hiện hoạt động loại flatMap theo sau là groupBy bằng cách sử dụng DataFrames không?

Trả lời

8

flatMap làm điều gì bạn muốn? Nó chuyển đổi mỗi hàng đầu vào thành 0 hoặc nhiều hàng. Nó có thể lọc chúng ra, hoặc nó có thể thêm những cái mới. Trong SQL để có cùng chức năng bạn sử dụng join. Bạn có thể làm những gì bạn muốn làm với một join?

Ngoài ra, bạn cũng có thể xem Dataframe.explode, chỉ là một loại cụ thể của join (bạn có thể dễ dàng tự tạo explode của riêng mình bằng cách tham gia DataFrame vào UDF). explode lấy một cột làm đầu vào và cho phép bạn tách nó hoặc chuyển đổi nó thành nhiều giá trị và sau đó join hàng gốc trở lại các hàng mới. Vì vậy:

user  groups 
griffin mkt,it,admin 

thể trở thành:

user  group 
griffin mkt 
griffin it 
griffin admin 

Vì vậy, tôi sẽ nói hãy nhìn vào DataFrame.explode và nếu điều đó không giúp bạn có một cách dễ dàng, hãy thử tham gia với UDFs.

+0

Cảm ơn bạn đã trả lời! Phương thức DataFrame.explode chính xác là những gì tôi đang tìm kiếm. –

0

SQL của tôi hơi bị gỉ, nhưng một tùy chọn nằm trong flatMap của bạn để tạo danh sách đối tượng Row và sau đó bạn có thể chuyển đổi RDD kết quả trở lại thành DataFrame.

Các vấn đề liên quan