2015-09-11 17 views
7

Tôi đang chạy công việc Spark để tổng hợp dữ liệu. Tôi có cấu trúc dữ liệu tùy chỉnh được gọi là Tiểu sử, về cơ bản chứa một số mutable.HashMap[Zone, Double]. Tôi muốn kết hợp tất cả các hồ sơ chia sẻ một chìa khóa nhất định (UUID), với đoạn mã sau:Phạm vi 'spark.driver.maxResultSize'

def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1} 
val aggregated = dailyProfiles 
    .aggregateByKey(new Profile(), 3200)(merge, merge).cache() 

Tò mò, Spark không thành công với các lỗi sau:

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

Các giải pháp rõ ràng là để tăng "spark.driver.maxResultSize", nhưng có hai điều khiến tôi khó hiểu.

  1. Quá nhiều của một trùng hợp ngẫu nhiên mà tôi nhận được 1024,0 lớn hơn 1024,0
  2. Tất cả các tài liệu hướng dẫn và giúp đỡ tôi thấy googling tham số lỗi và cấu hình đặc biệt này chỉ ra rằng nó ảnh hưởng đến chức năng mà phải mất một giá trị trở lại trình điều khiển. (nói take() hoặc collect()), nhưng tôi không dùng bất cứ điều gì cho người lái xe, chỉ cần đọc từ HDFS, tổng hợp, tiết kiệm trở lại HDFS.

Có ai biết tại sao tôi gặp phải lỗi này không?

+0

bạn có thể xác minh câu trả lời của tôi không? – mrsrinivas

+0

Tôi sẽ upvote nó, nhưng thật đáng buồn tôi không còn có quyền truy cập vào mã (hoặc công ty), cũng không trả lời của bạn giải quyết mảnh # 2, đó là hoạt động không nên xảy ra ở nơi đầu tiên: -S –

Trả lời

1

Yes, It's failing because The values we see in exception message are rounded off by one precision and comparison happening in bytes.

That serialized output must be more than 1024.0 MB and less than 1024.1 MB.

Kiểm tra thêm Đoạn mã Apache Spark, Rất thú vị và rất hiếm khi gặp lỗi này. :)

Ở đây totalResultSize > maxResultSize cả hai loại Dài và giữ giá trị theo byte. Nhưng msg giữ giá trị được làm tròn từ Utils.bytesToString().

//TaskSetManager.scala 
    def canFetchMoreResults(size: Long): Boolean = sched.synchronized { 
    totalResultSize += size 
    calculatedTasks += 1 
    if (maxResultSize > 0 && totalResultSize > maxResultSize) { 
     val msg = s"Total size of serialized results of ${calculatedTasks} tasks " + 
     s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " + 
     s"(${Utils.bytesToString(maxResultSize)})" 
     logError(msg) 
     abort(msg) 
     false 
    } else { 
     true 
    } 
    } 

Apache Spark 1.3 - source


//Utils.scala 
    def bytesToString(size: Long): String = { 
    val TB = 1L << 40 
    val GB = 1L << 30 
    val MB = 1L << 20 
    val KB = 1L << 10 

    val (value, unit) = { 
     if (size >= 2*TB) { 
     (size.asInstanceOf[Double]/TB, "TB") 
     } else if (size >= 2*GB) { 
     (size.asInstanceOf[Double]/GB, "GB") 
     } else if (size >= 2*MB) { 
     (size.asInstanceOf[Double]/MB, "MB") 
     } else if (size >= 2*KB) { 
     (size.asInstanceOf[Double]/KB, "KB") 
     } else { 
     (size.asInstanceOf[Double], "B") 
     } 
    } 
    "%.1f %s".formatLocal(Locale.US, value, unit) 
    } 

Apache Spark 1.3 - source