2016-12-09 30 views
5

Tôi có tệp NDJ lồng nhau lớn (JSON phân tách dòng mới) mà tôi cần đọc vào một khung dữ liệu tia lửa đơn và lưu vào sàn gỗ. Trong một nỗ lực để làm cho schema tôi sử dụng chức năng này:Đọc các tệp JSON lớn vào Khung dữ liệu Spark

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { 
     schema.fields.flatMap(f => { 
      val colName = if (prefix == null) f.name else (prefix + "." + f.name) 
      f.dataType match { 
      case st: StructType => flattenSchema(st, colName) 
      case _ => Array(col(colName)) 
      } 
     }) 
    } 

trên dataframe được trả về bằng cách đọc bởi

val df = sqlCtx.read.json(sparkContext.wholeTextFiles(path).values)

Tôi cũng đã chuyển này để val df = spark.read.json(path) để chỉ tác phẩm này với NDJ và không phải là nhiều dòng JSON - cùng một lỗi.

Điều này gây ra lỗi bộ nhớ trên công nhân java.lang.OutOfMemoryError: Java heap space.

Tôi đã thay đổi các tùy chọn bộ nhớ JVM và các tùy chọn spark người thi hành/điều khiển vô ích

Có cách nào để dòng tập tin, san bằng lược đồ, và thêm vào một dataframe từng bước? Một số dòng của JSON chứa các trường mới từ các entires trước đó ... vì vậy chúng sẽ cần được điền vào sau.

Trả lời

0

Bạn có thể đạt được điều này theo nhiều cách.

Đầu tiên khi đọc, bạn có thể cung cấp giản đồ cho khung dữ liệu để đọc json hoặc bạn có thể cho phép tia lửa tự suy ra lược đồ.

Khi json đang ở trong khung dữ liệu, bạn có thể làm theo các cách sau để làm phẳng nó.

a. Sử dụng explode() trên dataframe - để làm phẳng nó. b. Sử dụng spark sql và truy cập vào các trường lồng nhau bằng cách sử dụng. nhà điều hành. Bạn có thể tìm thấy các ví dụ here

Cuối cùng, nếu bạn muốn thêm cột mới vào khung dữ liệu a. Tùy chọn đầu tiên, sử dụng withColumn() là một cách tiếp cận. Tuy nhiên, điều này sẽ được thực hiện cho mỗi cột mới được thêm vào và cho toàn bộ tập dữ liệu. b. Sử dụng sql để tạo khung dữ liệu mới từ hiện tại - điều này có thể dễ dàng nhất là c. Cuối cùng, sử dụng bản đồ, sau đó truy cập các phần tử, lấy giản đồ cũ, thêm giá trị mới, tạo giản đồ mới và cuối cùng nhận được df mới - như bên dưới

One withColumn sẽ hoạt động trên toàn bộ rdd. Vì vậy, nói chung nó không phải là một thực hành tốt để sử dụng phương pháp cho mỗi cột bạn muốn thêm. Có một cách mà bạn làm việc với các cột và dữ liệu của chúng bên trong một hàm bản đồ. Vì một hàm bản đồ đang thực hiện công việc ở đây, nên mã để thêm cột mới và dữ liệu của nó sẽ được thực hiện song song.

a. bạn có thể thu thập các giá trị mới dựa trên các tính toán

b. Thêm các giá trị cột mới để RDD chính như sau

val newColumns: Seq[Any] = Seq(newcol1,newcol2) 
Row.fromSeq(row.toSeq.init ++ newColumns) 

đây liên tiếp, là tài liệu tham khảo của dòng trong phương pháp bản đồ

c. Tạo giản đồ mới như bên dưới

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType)) 

d. Thêm vào lược đồ cũ

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType) 

e.Tạo khung dữ liệu mới với các cột mới

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema) 
+0

Làm cách nào để giải quyết 'java.lang.OutOfMemoryError' là kết quả của' toàn tập tinFiles'? –

+0

Tôi đã giải quyết "Có cách nào để truyền tệp, làm phẳng lược đồ, và thêm vào một khung dữ liệu từng bước? Một số dòng của JSON chứa các trường mới từ các entires trước đó ... vì vậy chúng sẽ được điền vào sau. ". Tôi không thấy câu hỏi nào liên quan đến độ phân giải vấn đề bộ nhớ. Vì vậy, đã cho anh ta nhiều cách tiếp cận. – Ramzy

+0

Nếu NDJ là JSONL thì OP không nên sử dụng hàm fullTextFiles. Nếu nó không phải là điều này sẽ không giúp đỡ. –

2

Không hoạt động. Vấn đề là với giới hạn đối tượng JVM. Tôi đã kết thúc bằng cách sử dụng một trình phân tích cú pháp scala json và xây dựng khung dữ liệu theo cách thủ công.

Các vấn đề liên quan