2015-05-02 28 views
17

Tôi có một bảng gỗ với một trong các cột làLàm thế nào để đọc một bộ sưu tập lồng nhau trong Spark

, mảng < struct < col1, col2, .. colN >>

thể chạy các truy vấn đối với bảng này trong Hive bằng cách sử dụng cú pháp VIEW LATERAL.

Cách đọc bảng này thành RDD và quan trọng hơn là cách lọc, ánh xạ vv bộ sưu tập lồng nhau này trong Spark?

Không thể tìm thấy bất kỳ tham chiếu nào về điều này trong tài liệu Spark. Cảm ơn trước vì bất kỳ thông tin nào!

ps. Cảm thấy có thể hữu ích để cung cấp cho một số số liệu thống kê trên bàn. Số cột trong bảng chính ~ 600. Số hàng ~ 200m. Số "cột" trong bộ sưu tập lồng nhau ~ 10. Số lượng bản ghi trung bình trong bộ sưu tập lồng nhau ~ 35.

Trả lời

18

Không có phép thuật trong trường hợp thu thập lồng nhau. Spark sẽ xử lý cùng một cách RDD[(String, String)]RDD[(String, Seq[String])].

Đọc bộ sưu tập lồng nhau như vậy từ tệp Parquet có thể khó khăn.

Hãy lấy một ví dụ từ spark-shell (1.3.1):

scala> import sqlContext.implicits._ 
import sqlContext.implicits._ 

scala> case class Inner(a: String, b: String) 
defined class Inner 

scala> case class Outer(key: String, inners: Seq[Inner]) 
defined class Outer 

Viết file gỗ:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b"))))) 
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25 

scala> outers.toDF.saveAsParquetFile("outers.parquet") 

đọc file gỗ:

scala> import org.apache.spark.sql.catalyst.expressions.Row 
import org.apache.spark.sql.catalyst.expressions.Row 

scala> val dataFrame = sqlContext.parquetFile("outers.parquet") 
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>] 

scala> val outers = dataFrame.map { row => 
    | val key = row.getString(0) 
    | val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1))) 
    | Outer(key, inners) 
    | } 
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848 

Điều quan trọng một phần là row.getAs[Seq[Row]](1). Biểu diễn nội bộ của một chuỗi lồng nhau là structArrayBuffer[Row], bạn có thể sử dụng bất kỳ loại siêu dữ liệu nào thay thế cho số đó thay vì Seq[Row]. 1 là chỉ mục cột ở hàng bên ngoài. Tôi đã sử dụng phương pháp getAs ở đây nhưng có các lựa chọn thay thế trong các phiên bản mới nhất của Spark. Xem mã nguồn của số Row trait.

Bây giờ bạn có RDD[Outer], bạn có thể áp dụng bất kỳ chuyển đổi hoặc hành động mong muốn nào.

// Filter the outers 
outers.filter(_.inners.nonEmpty) 

// Filter the inners 
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a"))) 

Lưu ý rằng chúng tôi chỉ sử dụng thư viện spark-SQL để đọc tệp sàn. Ví dụ, bạn có thể chỉ chọn các cột mong muốn trực tiếp trên DataFrame, trước khi ánh xạ nó tới RDD.

dataFrame.select('col1, 'col2).map { row => ... } 
+1

Cảm ơn bạn Lomig cho câu trả lời chi tiết. Tôi đã đánh dấu nó là một câu trả lời chính xác. Mặc dù chúng tôi chưa ở Spark 1.3, dự định nâng cấp trong tháng này. Có thể thực hiện mà không có API khung dữ liệu trong Spark 1.2 không? Bạn có thể vui lòng cho tôi biết cách getAs [Seq [Row]] (1) hoạt động không? Chỉ mục [1] là vị trí của cột chứa mảng lồng nhau, đúng không? – Tagar

+1

Xem chỉnh sửa của tôi. Với Spark 1.2, bạn có thể sử dụng cùng một mã chính xác cho phép biến đổi từ 'Row' đến lớp chữ hoa của bạn. Vui lòng tham khảo tài liệu chính thức về cú pháp để đọc một tập tin lát gỗ trong các phiên bản cũ, nó rất gần. –

+0

OK. Cảm ơn rất nhiều. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala#L268 GetSeq [Row] (1) sẽ làm cũng? – Tagar

8

Tôi sẽ đưa ra câu trả lời dựa trên Python vì đó là những gì tôi đang sử dụng. Tôi nghĩ Scala có thứ gì đó tương tự.

Chức năng explode đã được thêm vào trong Spark 1.4.0 để xử lý các mảng lồng nhau trong DataFrames, theo Python API docs.

Tạo một dataframe kiểm tra:

from pyspark.sql import Row 

df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])]) 
df.show() 

## +-+--------------------+ 
## |a|    intlist| 
## +-+--------------------+ 
## |1|ArrayBuffer(1, 2, 3)| 
## |2|ArrayBuffer(4, 5, 6)| 
## +-+--------------------+ 

Sử dụng explode để san bằng cột danh sách:

from pyspark.sql.functions import explode 

df.select(df.a, explode(df.intlist)).show() 

## +-+---+ 
## |a|_c0| 
## +-+---+ 
## |1| 1| 
## |1| 2| 
## |1| 3| 
## |2| 4| 
## |2| 5| 
## |2| 6| 
## +-+---+ 
+0

Cảm ơn dnlbrky. Nó có vẻ đơn giản để đọc hơn Scala. Tôi chắc chắn sẽ thử ví dụ python của bạn .. Chúng tôi có lẽ sẽ không có Spark 1.4 mặc dù cho đến khi cuối năm nay một khi Cloudera phát hành CDH 5.5 :-) Hy vọng sẽ có Spark 1.5 bởi thời gian đó. – Tagar

3

cách tiếp cận khác sẽ sử dụng mô hình kết hợp như thế này:

val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
    case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match { 
    case List(a:String, b: String) => (a, b) 
    }).toList 
}) 

Bạn có thể mẫu khớp trực tiếp trên Hàng nhưng có thể sẽ không thành công vì một vài lý do.

0

Câu trả lời trên là tất cả các câu trả lời tuyệt vời và giải quyết câu hỏi này từ các phía khác nhau; Spark SQL cũng là cách khá hữu ích để truy cập dữ liệu lồng nhau.

Đây là ví dụ về cách sử dụng explode() trong SQL trực tiếp để truy vấn tập hợp lồng nhau.

SELECT hholdid, tsp.person_seq_no 
FROM ( SELECT hholdid, explode(tsp_ids) as tsp 
     FROM disc_mrt.unified_fact uf 
    ) 

tsp_ids là một lồng ghép các cấu trúc, có nhiều thuộc tính, bao gồm person_seq_no mà tôi đang chọn trong truy vấn bên ngoài ở trên.

Ở trên đã được thử nghiệm trong Spark 2.0. Tôi đã làm một thử nghiệm nhỏ và nó không hoạt động trong Spark 1.6. Câu hỏi này được hỏi khi Spark 2 không ở xung quanh, vì vậy câu trả lời này thêm vào danh sách các tùy chọn có sẵn để xử lý các cấu trúc lồng nhau.

đáng chú ý JIRAs không được giải quyết trên explode() để truy cập SQL:

Các vấn đề liên quan