Tôi đang sử dụng Spark 1.3 để thực hiện tổng hợp trên nhiều dữ liệu. Công việc bao gồm 4 bước sau:Tại sao "Lỗi giao tiếp với MapOutputTracker" được báo cáo khi Spark cố gắng gửi GetMapOutputStatuses?
- Đọc một (1TB) tập tin chuỗi lớn (tương ứng với 1 ngày của dữ liệu)
- Lọc ra nhất của nó và nhận được khoảng 1GB ngẫu nhiên ghi
- keyBy khách hàng
- aggregateByKey() cho cấu trúc tùy chỉnh tạo hồ sơ cho khách hàng đó, tương ứng với HashMap [Long, Float] cho mỗi khách hàng. Các phím dài là duy nhất và không bao giờ lớn hơn 50K mục riêng biệt.
Tôi đang chạy này với cấu hình này:
--name geo-extract-$1-askTimeout \
--executor-cores 8 \
--num-executors 100 \
--executor-memory 40g \
--driver-memory 4g \
--driver-cores 8 \
--conf 'spark.storage.memoryFraction=0.25' \
--conf 'spark.shuffle.memoryFraction=0.35' \
--conf 'spark.kryoserializer.buffer.max.mb=1024' \
--conf 'spark.akka.frameSize=1024' \
--conf 'spark.akka.timeout=200' \
--conf 'spark.akka.askTimeout=111' \
--master yarn-cluster \
Và nhận được lỗi này:
org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
...
Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
... 21 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
Công việc và logic đã được chứng minh để làm việc với một bộ kiểm tra nhỏ và Tôi thậm chí có thể chạy công việc này cho một số ngày nhưng không phải cho những người khác. Tôi đã googled xung quanh và tìm thấy gợi ý rằng "Lỗi giao tiếp với MapOutputTracker" có liên quan đến tin nhắn Spark nội bộ, nhưng tôi đã tăng "spark.akka.frameSize", "spark.akka.timeout" và "spark.akka.askTimeout" (điều cuối cùng này thậm chí không xuất hiện trên tài liệu Spark, nhưng đã được đề cập trong danh sách gửi thư của Spark), không có kết quả. Vẫn còn một số thời gian chờ xảy ra vào lúc 30 giây mà tôi không có đầu mối làm thế nào để xác định hoặc sửa chữa.
Tôi thấy không có lý do nào để lỗi này do kích thước dữ liệu, vì hoạt động lọc và thực tế là aggregateByKey thực hiện tổng hợp cục bộ cục bộ nên đủ để giải quyết kích thước dữ liệu. Số lượng tác vụ là 16K (tự động từ đầu vào ban đầu), nhiều hơn 800 lõi đang chạy này, trên 100 người thực thi, vì vậy nó không đơn giản như mẹo "phân vùng tăng dần" thông thường. Bất kỳ manh mối nào cũng sẽ được đánh giá cao! Cảm ơn!
'16/13/01 13:19:30 WARN util.AkkaUtils: Lỗi gửi tin nhắn [message = GetMapOutputStatuses (214)] trong 1 nỗ lực java.util.concurrent.TimeoutException: Futures timed out sau [30 giây] \t tại scala.concurrent.impl.Promise $ DefaultPromise.ready (Promise.scala: 219) \t tại scala.concurrent.impl.Promise $ DefaultPromise.result (Promise.scala: 223) \t tại scala.concurrent .Await $$ anonfun $ result $ 1.apply (package.scala: 107) \t tại scala.concurrent.BlockContext $ DefaultBlockContext $ .blockOn (BlockContext.scala: 53) \t tại scala.concurren t.Chỉ cần $ .result (package.scala: 107) ' – Sam
Tôi không phải đối mặt với vấn đề này mọi lúc. Tôi đang sử dụng tia lửa với dịch vụ amazon kinesis. Tôi đang phải đối mặt với vấn đề này trong thời gian dài nói 20 giờ chạy. Bất kỳ trợ giúp như làm thế nào để gỡ lỗi nó hơn nữa sẽ được giúp đỡ rất nhiều. Cảm ơn – Sam
Hi Sam. Tôi vẫn không có một câu trả lời rõ ràng cho việc này, nhưng chơi với thực sự *** giảm *** số lượng phân vùng. Một số lượng công việc nhỏ hơn dường như đòi hỏi ít bộ nhớ đồng bộ hơn và đôi khi hoạt động. –