2015-03-24 14 views
5

Nếu không có cảnh báo deprecation trong spark SQL 1.2.1, đoạn code sau ngừng làm việc trong 1,3Đây có phải là lỗi hồi quy trong Spark 1.3 không?

Làm việc trong 1.2.1 (không có bất kỳ cảnh báo deprecation)

val sqlContext = new HiveContext(sc) 
import sqlContext._ 
val jsonRDD = sqlContext.jsonFile(jsonFilePath) 
jsonRDD.registerTempTable("jsonTable") 

val jsonResult = sql(s"select * from jsonTable") 
val foo = jsonResult.zipWithUniqueId().map { 
    case (Row(...), uniqueId) => // do something useful 
    ... 
} 

foo.registerTempTable("...") 

Ngưng làm việc trong 1.3.0 (đơn giản là không biên dịch, và tất cả tôi đã làm là thay đổi 1,3)

jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method 

không workar làm việc ound:

mặc dù điều này có thể cung cấp cho tôi một RDD [Row]:

jsonResult.rdd.zipWithUniqueId() 

bây giờ điều này sẽ không làm việc như RDD[Row] không có một phương pháp registerTempTable tất nhiên

 foo.registerTempTable("...") 

đây là những câu hỏi của tôi

  1. Có cách giải quyết nào không? (ví dụ: tôi chỉ làm sai?)
  2. Đây có phải là lỗi không? (Tôi nghĩ rằng bất cứ điều gì ngừng biên dịch mà làm việc trong một phiên bản trước đó, mà không có một cảnh báo @deprecated rõ ràng là một lỗi hồi quy)

Trả lời

5

Nó không phải là lỗi, nhưng xin lỗi vì sự nhầm lẫn! Cho đến Spark 1.3, Spark SQL được gán nhãn là Alpha Component vì các API vẫn còn trong thông lượng. Với Spark 1.3, chúng tôi đã tốt nghiệp và ổn định API. Mô tả đầy đủ về những gì bạn cần làm khi chuyển có thể được tìm thấy trong the documentation.

Tôi cũng có thể trả lời câu hỏi cụ thể của bạn và đưa ra một số lý giải về lý do tại sao chúng tôi thực hiện những thay đổi này

Ngưng làm việc trong 1.3.0 (đơn giản là không biên dịch, và tất cả tôi đã làm là thay đổi đến 1,3) jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method

DataFrames giờ là một giao diện thống nhất trên cả Scala và Java. Tuy nhiên, vì chúng tôi phải duy trì tính tương thích với API RDD hiện tại cho các phần còn lại của 1.X, DataFrames không phải là RDD s. Để có được đại diện RDD bạn có thể gọi df.rdd hoặc df.javaRDD

Thêm vào đó, vì chúng tôi sợ của một số các rắc rối có thể xảy ra với các chuyển đổi ngầm, chúng tôi đã làm cho nó như vậy mà bạn phải gọi một cách rõ ràng rdd.toDF gây ra việc chuyển đổi từ RDD để xảy ra. Tuy nhiên, chuyển đổi này chỉ hoạt động tự động nếu RDD của bạn giữ các đối tượng kế thừa từ Product (ví dụ: bộ dữ liệu hoặc các kiểu chữ).

Quay lại câu hỏi ban đầu, nếu bạn muốn thực hiện các phép biến đổi trên các hàng có lược đồ tùy ý, bạn cần nói rõ ràng về Spark SQL về cấu trúc của dữ liệu sau khi thao tác bản đồ (vì trình biên dịch không thể).

import org.apache.spark.sql.types._ 
val jsonData = sqlContext.jsonRDD(sc.parallelize("""{"name": "Michael", "zip": 94709}""" :: Nil)) 
val newSchema = 
    StructType(
    StructField("uniqueId", IntegerType) +: jsonData.schema.fields) 

val augmentedRows = jsonData.rdd.zipWithUniqueId.map { 
    case (row, id) => 
    Row.fromSeq(id +: row.toSeq) 
} 

val newDF = sqlContext.createDataFrame(augmentedRows, newSchema) 
+0

Cảm ơn! Tôi đoán tôi nên đã đọc hướng dẫn đầu tiên;) https: // spark.apache.org/docs/1.3.0/sql-programming-guide.html#interoperating-with-rdds –

Các vấn đề liên quan