2015-09-09 23 views
14

Tôi đang sử dụng Spark 1.3 để thực hiện tổng hợp trên nhiều dữ liệu. Công việc bao gồm 4 bước sau:Tại sao "Lỗi giao tiếp với MapOutputTracker" được báo cáo khi Spark cố gắng gửi GetMapOutputStatuses?

  1. Đọc một (1TB) tập tin chuỗi lớn (tương ứng với 1 ngày của dữ liệu)
  2. Lọc ra nhất của nó và nhận được khoảng 1GB ngẫu nhiên ghi
  3. keyBy khách hàng
  4. aggregateByKey() cho cấu trúc tùy chỉnh tạo hồ sơ cho khách hàng đó, tương ứng với HashMap [Long, Float] cho mỗi khách hàng. Các phím dài là duy nhất và không bao giờ lớn hơn 50K mục riêng biệt.

Tôi đang chạy này với cấu hình này:

--name geo-extract-$1-askTimeout \ 
--executor-cores 8 \ 
--num-executors 100 \ 
--executor-memory 40g \ 
--driver-memory 4g \ 
--driver-cores 8 \ 
--conf 'spark.storage.memoryFraction=0.25' \ 
--conf 'spark.shuffle.memoryFraction=0.35' \ 
--conf 'spark.kryoserializer.buffer.max.mb=1024' \ 
--conf 'spark.akka.frameSize=1024' \ 
--conf 'spark.akka.timeout=200' \ 
--conf 'spark.akka.askTimeout=111' \ 
--master yarn-cluster \ 

Và nhận được lỗi này:

org.apache.spark.SparkException: Error communicating with MapOutputTracker 
     at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117) 
     at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164) 
     at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) 
     ... 
    Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)] 
     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) 
     at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) 
     ... 21 more 
    Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] 
     at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
     at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
     at scala.concurrent.Await$.result(package.scala:107) 
     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) 

Công việc và logic đã được chứng minh để làm việc với một bộ kiểm tra nhỏ và Tôi thậm chí có thể chạy công việc này cho một số ngày nhưng không phải cho những người khác. Tôi đã googled xung quanh và tìm thấy gợi ý rằng "Lỗi giao tiếp với MapOutputTracker" có liên quan đến tin nhắn Spark nội bộ, nhưng tôi đã tăng "spark.akka.frameSize", "spark.akka.timeout" và "spark.akka.askTimeout" (điều cuối cùng này thậm chí không xuất hiện trên tài liệu Spark, nhưng đã được đề cập trong danh sách gửi thư của Spark), không có kết quả. Vẫn còn một số thời gian chờ xảy ra vào lúc 30 giây mà tôi không có đầu mối làm thế nào để xác định hoặc sửa chữa.

Tôi thấy không có lý do nào để lỗi này do kích thước dữ liệu, vì hoạt động lọc và thực tế là aggregateByKey thực hiện tổng hợp cục bộ cục bộ nên đủ để giải quyết kích thước dữ liệu. Số lượng tác vụ là 16K (tự động từ đầu vào ban đầu), nhiều hơn 800 lõi đang chạy này, trên 100 người thực thi, vì vậy nó không đơn giản như mẹo "phân vùng tăng dần" thông thường. Bất kỳ manh mối nào cũng sẽ được đánh giá cao! Cảm ơn!

+0

'16/13/01 13:19:30 WARN util.AkkaUtils: Lỗi gửi tin nhắn [message = GetMapOutputStatuses (214)] trong 1 nỗ lực java.util.concurrent.TimeoutException: Futures timed out sau [30 giây] \t tại scala.concurrent.impl.Promise $ DefaultPromise.ready (Promise.scala: 219) \t tại scala.concurrent.impl.Promise $ DefaultPromise.result (Promise.scala: 223) \t tại scala.concurrent .Await $$ anonfun $ result $ 1.apply (package.scala: 107) \t tại scala.concurrent.BlockContext $ DefaultBlockContext $ .blockOn (BlockContext.scala: 53) \t tại scala.concurren t.Chỉ cần $ .result (package.scala: 107) ' – Sam

+0

Tôi không phải đối mặt với vấn đề này mọi lúc. Tôi đang sử dụng tia lửa với dịch vụ amazon kinesis. Tôi đang phải đối mặt với vấn đề này trong thời gian dài nói 20 giờ chạy. Bất kỳ trợ giúp như làm thế nào để gỡ lỗi nó hơn nữa sẽ được giúp đỡ rất nhiều. Cảm ơn – Sam

+0

Hi Sam. Tôi vẫn không có một câu trả lời rõ ràng cho việc này, nhưng chơi với thực sự *** giảm *** số lượng phân vùng. Một số lượng công việc nhỏ hơn dường như đòi hỏi ít bộ nhớ đồng bộ hơn và đôi khi hoạt động. –

Trả lời

5

Tôi gặp vấn đề tương tự, công việc của tôi sẽ hoạt động tốt với tập dữ liệu nhỏ hơn, nhưng sẽ không thành công với số liệu lớn hơn.

Sau nhiều thay đổi cấu hình, tôi thấy rằng việc thay đổi cài đặt bộ nhớ của trình điều khiển có nhiều tác động hơn thay đổi cài đặt bộ nhớ của trình thực thi. Cũng sử dụng bộ thu gom rác mới giúp ích rất nhiều. Tôi đang sử dụng cấu hình sau cho một cụm 3, với 40 lõi mỗi.Hy vọng các cấu hình sau đây sẽ giúp:

spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 - 
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g 
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions 

spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 - 
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g 
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions 


spark.driver.memory=8g 
spark.driver.cores=10 
spark.driver.maxResultSize=8g 

spark.executor.memory=16g 
spark.executor.cores=25 

spark.default.parallelism=50 
spark.eventLog.dir=hdfs://mars02-db01/opt/spark/logs 
spark.eventLog.enabled=true 

spark.kryoserializer.buffer=512m 
spark.kryoserializer.buffer.max=1536m 

spark.rdd.compress=true 
spark.storage.memoryFraction=0.15 
spark.storage.MemoryStore=12g 
+0

nếu nó không hoạt động, hãy tạo spark.executor.cores = 2 (một số thấp), bằng cách làm điều đó bạn sẽ có nhiều thùng chứa đang chạy quá trình – sparkDabbler

2

Điều gì đang xảy ra trong trình điều khiển tại thời điểm xảy ra lỗi này? Nó có thể là do áp lực bộ nhớ trên trình điều khiển khiến nó không phản hồi. Nếu tôi nhớ chính xác, MapOutputTracker mà nó đang cố gắng truy cập khi nó gọi GetMapOutputStatuses đang chạy trong trình điều khiển trình điều khiển Spark.

Nếu bạn đang đối mặt với các GC dài hoặc tạm dừng khác vì một lý do nào đó trong quá trình này, điều này sẽ gây ra các ngoại lệ mà bạn nhìn thấy ở trên.

Một số điều cần thử là thử gỡ bỏ quy trình trình điều khiển khi bạn bắt đầu thấy các lỗi này và xem điều gì sẽ xảy ra. Nếu jstack không đáp ứng, nó có thể là trình điều khiển của bạn không đủ đáp ứng.

Nhiệm vụ 16K có vẻ như rất nhiều để người lái xe theo dõi, bất kỳ cơ hội nào bạn có thể tăng bộ nhớ trình điều khiển qua 4g?

Các vấn đề liên quan