2016-10-14 18 views
6

Tôi mới dùng cả Scala và Spark, vì vậy hy vọng ai đó có thể cho tôi biết tôi đang đi sai ở đâu.Spark "CodeGenerator: không biên dịch được" với Dataset.groupByKey

Tôi có bộ dữ liệu ba cột (id, name, year) và tôi muốn tìm năm gần nhất cho mỗi tên. Nói cách khác:

BEFORE           AFTER 
| id_1 | name_1 | 2015 |      | id_2 | name_1 | 2016 | 
| id_2 | name_1 | 2016 |      | id_4 | name_2 | 2015 | 
| id_3 | name_1 | 2014 | 
| id_4 | name_2 | 2015 | 
| id_5 | name_2 | 2014 | 

Tôi nghĩ groupByKeyreduceGroups sẽ có được công việc thực hiện:

val latestYears = ds 
    .groupByKey(_.name) 
    .reduceGroups((left, right) => if (left.year > right.year) left else right) 
    .map(group => group._2) 

Nhưng nó mang lại cho lỗi này, và spits ra rất nhiều mã Java tạo:

ERROR CodeGenerator: failed to compile: 
org.codehaus.commons.compiler.CompileException: 
File 'generated.java', Line 21, Column 101: Unknown variable or type "value4" 

Thật thú vị, nếu tôi tạo tập dữ liệu chỉ với tên và cột năm, nó hoạt động như mong đợi.


Dưới đây là đầy đủ mã Tôi đang chạy:

object App { 

    case class Record(id: String, name: String, year: Int) 

    def main(args: Array[String]) { 
    val spark = SparkSession.builder().master("local").appName("test").getOrCreate() 
    import spark.implicits._ 

    val ds = spark.createDataset[String](Seq(
     "id_1,name_1,2015", 
     "id_2,name_1,2016", 
     "id_3,name_1,2014", 
     "id_4,name_2,2015", 
     "id_5,name_2,2014" 
    )) 
     .map(line => { 
     val fields = line.split(",") 
     new Record(fields(0), fields(1), fields(2).toInt) 
     }) 

    val latestYears = ds 
     .groupByKey(_.name) 
     .reduceGroups((left, right) => if (left.year > right.year) left else right) 
     .map(group => group._2) 

    latestYears.show() 
    } 


} 

EDIT: Tôi tin rằng đây có thể là một lỗi với Spark v2.0.1. Sau khi hạ cấp xuống v2.0.0, điều này không còn xảy ra nữa.

+0

Cùng một vấn đề ở đây, tôi đã giải quyết vấn đề bằng cách chuyển đổi reduceGroups(). Map (_._ 2) thành mapGroups (_. Reduce (_._ 2)). Bạn đã báo cáo sự cố này với trình theo dõi danh sách gửi thư/phát hành theo dõi tia lửa chưa? –

+0

Đó có thể là một lỗi, nhưng nhiều hơn nữa _concerned_ với mã chính nó. Tại sao bạn không sử dụng 'groupBy' và' max' trên 'year'? Nó sử dụng DataFrame API không được định dạng (không Dataset) mặc dù. Bất kỳ lý do cụ thể nào? –

Trả lời

0

Các chức năng groupByreduceGroups của bạn là experimental. Tại sao không sử dụng reduceByKey (api)?

Ưu điểm:

  • Nó phải là dễ dàng để dịch từ mã mà bạn có.
  • Nó ổn định hơn (không phải thử nghiệm).
  • Nó sẽ hiệu quả hơn vì nó không yêu cầu trộn hoàn toàn tất cả các mục trong mỗi nhóm (cũng có thể tạo ra tốc độ I/O mạng và làm tràn bộ nhớ trong một nút).
Các vấn đề liên quan