2015-01-13 14 views
5

Tôi đang cố gắng đọc tệp json này vào bảng hive, các phím mức cao nhất tức là 1,2 .., ở đây không nhất quán.đọc khóa-giá trị json với hive/sql và spark

{ 
    "1":"{\"time\":1421169633384,\"reading1\":130.875969,\"reading2\":227.138275}", 
    "2":"{\"time\":1421169646476,\"reading1\":131.240628,\"reading2\":226.810211}", 
    "position": 0 
} 

Tôi chỉ cần thời gian và số đọc 1,2 trong bảng tổ ong của tôi làm cột bỏ qua vị trí. Tôi cũng có thể thực hiện kết hợp truy vấn hive và tạo mã giảm bản đồ. Cảm ơn sự giúp đỡ của bạn.

Update, đây là những gì tôi đang cố gắng

val hqlContext = new HiveContext(sc) 

val rdd = sc.textFile(data_loc) 

val json_rdd = hqlContext.jsonRDD(rdd) 
json_rdd.registerTempTable("table123") 
println(json_rdd.printSchema()) 
hqlContext.sql("SELECT json_val from table123 lateral view explode_map(json_map(*, 'int,string')) x as json_key, json_val ").foreach(println) 

Nó ném các lỗi sau:

Exception in thread "main" org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: SELECT json_val from temp_hum_table lateral view explode_map(json_map(*, 'int,string')) x as json_key, json_val 
    at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:239) 
    at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) 
    at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) 
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) 
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) 
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
+0

Ví dụ về những gì bạn muốn đầu ra trông như thế sẽ cực kỳ hữu ích. – gobrewers14

+0

ví dụ về bảng kết quả đầu ra: '" thời gian "," đọc1 "," đọc2 "\ n 1421169633384, 130.875969, 227.138275 \ n 1421169646476, 131.240628, 226.810211' – venuktan

Trả lời

4

này sẽ làm việc, nếu bạn đổi tên "1" và "2" (tên key) thành "x1" và "x2" (bên trong tệp json hoặc trong bảng):

val resultrdd = sqlContext.sql("SELECT x1.time, x1.reading1, x1.reading1, x2.time, x2.reading1, x2.reading2 from table123 ") 
resultrdd.flatMap(row => (Array((row(0),row(1),row(2)), (row(3),row(4),row(5))))) 

Điều này sẽ cung cấp cho bạn RDD bộ dữ liệu với thời gian, đọc1 và đọc2. Nếu bạn cần một SchemaRDD, bạn sẽ lập bản đồ nó vào một lớp trường hợp bên chuyển đổi flatMap, như thế này:

case class Record(time: Long, reading1: Double, reading2: Double) 
resultrdd.flatMap(row => (Array(Record(row.getLong(0),row.getDouble(1),row.getDouble(2)), 
     Record(row.getLong(3),row.getDouble(4),row.getDouble(5)) ))) 
val schrdd = sqlContext.createSchemaRDD(resultrdd) 

Cập nhật:

Trong trường hợp nhiều phím lồng nhau, bạn có thể phân tích hàng như thế này:

val allrdd = sqlContext.sql("SELECT * from table123") 
allrdd.flatMap(row=>{ 
    var recs = Array[Record](); 
    for(col <- (0 to row.length-1)) { 
     row(col) match { 
      case r:Row => recs = recs :+ Record(r.getLong(2),r.getDouble(0),r.getDouble(1)); 
      case _ => ; 
     } 
    }; 
    recs 
}) 
+0

các phím sẽ chuyển sang tất cả 1,2 ... 240. do đó, làm x1.time và như vậy có thể không hoạt động. – venuktan

+0

Tôi đã cập nhật câu trả lời của mình – pzecevic

+0

Đây là những gì tôi đã làm sai lỗi 'allrdd.registerTempTable ("vals"); sqlContext.sql ("select reading1 from vals LIMIT 10") .collect.foreach (println) 'Tôi có thiếu gì đó không? – venuktan

Các vấn đề liên quan