Không có phép thuật trong trường hợp thu thập lồng nhau. Spark sẽ xử lý cùng một cách RDD[(String, String)]
và RDD[(String, Seq[String])]
.
Đọc bộ sưu tập lồng nhau như vậy từ tệp Parquet có thể khó khăn.
Hãy lấy một ví dụ từ spark-shell
(1.3.1):
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> case class Inner(a: String, b: String)
defined class Inner
scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer
Viết file gỗ:
scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> outers.toDF.saveAsParquetFile("outers.parquet")
đọc file gỗ:
scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row
scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]
scala> val outers = dataFrame.map { row =>
| val key = row.getString(0)
| val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
| Outer(key, inners)
| }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
Điều quan trọng một phần là row.getAs[Seq[Row]](1)
. Biểu diễn nội bộ của một chuỗi lồng nhau là struct
là ArrayBuffer[Row]
, bạn có thể sử dụng bất kỳ loại siêu dữ liệu nào thay thế cho số đó thay vì Seq[Row]
. 1
là chỉ mục cột ở hàng bên ngoài. Tôi đã sử dụng phương pháp getAs
ở đây nhưng có các lựa chọn thay thế trong các phiên bản mới nhất của Spark. Xem mã nguồn của số Row trait.
Bây giờ bạn có RDD[Outer]
, bạn có thể áp dụng bất kỳ chuyển đổi hoặc hành động mong muốn nào.
// Filter the outers
outers.filter(_.inners.nonEmpty)
// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
Lưu ý rằng chúng tôi chỉ sử dụng thư viện spark-SQL để đọc tệp sàn. Ví dụ, bạn có thể chỉ chọn các cột mong muốn trực tiếp trên DataFrame, trước khi ánh xạ nó tới RDD.
dataFrame.select('col1, 'col2).map { row => ... }
Cảm ơn bạn Lomig cho câu trả lời chi tiết. Tôi đã đánh dấu nó là một câu trả lời chính xác. Mặc dù chúng tôi chưa ở Spark 1.3, dự định nâng cấp trong tháng này. Có thể thực hiện mà không có API khung dữ liệu trong Spark 1.2 không? Bạn có thể vui lòng cho tôi biết cách getAs [Seq [Row]] (1) hoạt động không? Chỉ mục [1] là vị trí của cột chứa mảng lồng nhau, đúng không? – Tagar
Xem chỉnh sửa của tôi. Với Spark 1.2, bạn có thể sử dụng cùng một mã chính xác cho phép biến đổi từ 'Row' đến lớp chữ hoa của bạn. Vui lòng tham khảo tài liệu chính thức về cú pháp để đọc một tập tin lát gỗ trong các phiên bản cũ, nó rất gần. –
OK. Cảm ơn rất nhiều. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala#L268 GetSeq [Row] (1) sẽ làm cũng? – Tagar