2015-01-05 18 views
5

Tôi có 2 RDD, tia lửa dữ liệuRDD và newPairDataRDD được sử dụng cho truy vấn SQL Spark. khi init ứng dụng của tôi, dataRDD sẽ được khởi tạo. Tất cả dữ liệu trong một thực thể hbase được chỉ định sẽ được lưu vào dataRDD.nó rất chậm cho liên kết RDD tia lửa

Khi truy vấn sql của khách hàng đến, APP của tôi sẽ nhận được tất cả các bản cập nhật mới và chèn vào newPairDataRDD. liên kết dataRDD newPairDataRDD và đăng ký dưới dạng bảng trong ngữ cảnh SQL lửa.

Tôi tìm thấy thậm chí 0 bản ghi trong dữ liệuRDD và 1 bản ghi được chèn mới trong newPairDataRDD. Nó sẽ mất 4 giây cho công đoàn. Đó là quá chậm

Tôi nghĩ điều đó không hợp lý. Bất cứ ai cũng biết làm thế nào để làm cho nó nhanh hơn? Cảm ơn mã đơn giản như bên dưới

// Step1: load all data from hbase to dataRDD when initial, this only run once. 
    JavaPairRDD<String, Row> dataRDD= getAllBaseDataToJavaRDD(); 
    dataRDD.cache(); 
    dataRDD.persist(StorageLevel.MEMORY_ONLY()); 
    logger.info(dataRDD.count()); 

    // Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD 

    JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD(); 
    // Step3: if count>0 do union and reduce 

     if(newPairDataRDD.count() > 0) { 

     JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD); 

    // if data was updated in DB, need to delete the old version from the dataRDD. 

     dataRDD = unionedRDD.reduceByKey(
      new Function2<Row, Row, Row>() { 
      // @Override 
      public Row call(Row r1, Row r2) { 
      return r2; 
      } 
      }); 
    } 
//step4: register the dataRDD 
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema); 

//step5: execute sql query 
retRDD = sqlContext.sql(sql); 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

Từ trang web tia lửa, tôi có thể xem bên dưới. Rõ ràng nó cần 4s cho đoàn

Các giai đoạn hoàn thành (8)

StageId Mô tả Đăng Nhiệm vụ Thời gian: Kế/Tổng Input shuffle shuffle đọc Viết

6 thu thập tại SparkPlan.scala: 85 + chi tiết 1/4/2015 08:17 2 s 8-Aug 156,0 B

7 công đoàn tại SparkSqlQueryForMarsNew.java:389+details 2015/01/04 08:17 4 s 8-Aug 64,0 B 156,0 B

Trả lời

1

Một hiệu quả hơn cách để đạt được những gì bạn muốn là sử dụng cogroup()flatMapValues(), bằng cách sử dụng một công đoàn rất ít ngoại trừ thêm phân vùng mới vào dataRDD, có nghĩa là tất cả dữ liệu phải được xáo trộn trước reduceByKey(). A cogroup()flatMapValues() sẽ chỉ phân phối lại số newPairDataRDD.

JavaPairRDD<String, Tuple2<List<Row>, List<Row>>> unionedRDD = dataRDD.cogroup(newPairDataRDD); 
JavaPairRDD<String, Row> updated = unionedRDD.flatMapValues(
    new Function<Tuple2<List<Row>, List<Row>>, Iterable<Row>>() { 
     public Iterable<Row> call(Tuple2<List<Row>, List<Row>> grouped) { 
      if (grouped._2.nonEmpty()) { 
       return grouped._2; 
      } else { 
       return grouped._1; 
      } 
     } 
    }); 

Hoặc trong Scala

val unioned = dataRDD.cogroup(newPairDataRDD) 
val updated = unioned.flatMapValues { case (oldVals, newVals) => 
    if (newVals.nonEmpty) newVals else oldVals 
} 

Disclaimer, tôi không sử dụng để viết tia lửa trong Java! Xin vui lòng một người nào đó sửa tôi nếu ở trên là sai!

0

Hãy thử phân vùng lại RDDs của bạn:

JavaPairRDD unionedRDD = dataRDD.repartition (sc.defaultParallelism * 3) .union (newPairDataRDD.repartition (sc.defaultParallelism * 3));

Các vấn đề liên quan