12

Mô tả

Chúng tôi có một Spark streaming 1.5.2 ứng dụng trong Scala mà đọc JSON sự kiện từ một Stream Kinesis, hiện một số biến đổi/Sự quy tụ và ghi kết quả vào các tiền tố S3 khác nhau. Khoảng thời gian hàng loạt hiện tại là 60 giây. Chúng tôi có 3000-7000 sự kiện/giây. Chúng tôi đang sử dụng điểm kiểm tra để bảo vệ chúng tôi khỏi bị mất tập hợp.vấn đề đáng tin cậy với checkpointing/WAL trong Spark Truyền 1.6.0

Đã hoạt động tốt trong một thời gian, khôi phục từ trường hợp ngoại lệ và thậm chí khởi động lại cụm. Gần đây, chúng tôi đã biên dịch lại mã cho Spark Streaming 1.6.0, chỉ thay đổi phụ thuộc thư viện trong tệp build.sbt. Sau khi chạy mã trong cụm Spark 1.6.0 trong vài giờ, chúng tôi đã nhận thấy những điều sau:

  1. Biến động "Tốc độ đầu vào" và "Thời gian xử lý" đã tăng đáng kể (xem ảnh chụp màn hình bên dưới) trong 1.6. 0.
  2. Vài giờ một lần, có một '' ngoại lệ được ném trong khi viết bản ghi: BlockAdditionEvent ... vào WriteAheadLog. java.util.concurrent.TimeoutException: Tương lai đã hết thời gian chờ sau [5000 mili giây] ngoại lệ (xem dấu vết ngăn xếp hoàn thành bên dưới) trùng với sự giảm xuống 0 sự kiện/giây đối với các lô cụ thể (phút).

Sau khi thực hiện một số thao tác, tôi nghĩ vấn đề thứ hai có liên quan đến số Pull Request này. Mục tiêu ban đầu của PR: “Khi sử dụng S3 làm thư mục cho WALs, quá trình ghi mất quá nhiều thời gian. Trình điều khiển sẽ rất dễ bị tắc nghẽn khi nhiều người nhận gửi các sự kiện AddBlock đến ReceiveTracker. PR này bổ sung thêm các sự kiện trong ReceivedBlockTracker để người nhận không bị người điều khiển chặn quá lâu. ”

Chúng tôi đang kiểm tra S3 trong Spark 1.5.2 và không có vấn đề về hiệu suất/độ tin cậy. Chúng tôi đã kiểm tra điểm kiểm tra trong Spark 1.6.0 trong S3 và NAS cục bộ và trong cả hai trường hợp, chúng tôi nhận được ngoại lệ này. Có vẻ như khi mất hơn 5 giây để kiểm tra một lô, ngoại lệ này phát sinh và chúng tôi đã kiểm tra xem các sự kiện cho lô đó có bị mất vĩnh viễn không.

Câu hỏi

  • là sự gia tăng “Tỷ lệ đầu vào” và “Xử lý thời gian” biến động dự kiến ​​trong Spark streaming 1.6.0 và là có cách nào biết cải thiện nó?

  • Bạn có biết về bất kỳ workaround ngoài những 2 ?:

    1) Để đảm bảo rằng phải mất ít hơn 5 giây cho bồn rửa checkpointing để viết tất cả các file. Theo kinh nghiệm của tôi, bạn không thể đảm bảo rằng với S3, ngay cả đối với các lô nhỏ. Đối với NAS địa phương, nó phụ thuộc vào ai chịu trách nhiệm về cơ sở hạ tầng (khó khăn với các nhà cung cấp dịch vụ đám mây).

    2) Tăng giá trị thuộc tính spark.streaming.driver.writeAheadLog.batchingTimeout.

  • Bạn có dự định mất mọi sự kiện trong kịch bản được mô tả không? Tôi nghĩ rằng nếu việc kiểm tra lô không thành công, số thứ tự của bộ nhận/phát sẽ không được tăng lên và nó sẽ được thử lại sau.

Spark 1.5.2 Thống kê - Ảnh chụp màn hình

enter image description here

Spark 1.6.0 Thống kê - Ảnh chụp màn hình

enter image description here

Full Stack Trace

16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650), SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874), SequenceNumberRange(StreamEventsPRD,shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914), SequenceNumberRange(StreamEventsPRD,shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842), SequenceNumberRange(StreamEventsPRD,shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618), SequenceNumberRange(StreamEventsPRD,shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog. 
java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds] 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
    at scala.concurrent.Await$.result(package.scala:107) 
    at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81) 
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232) 
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500) 
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Source Code Trích xuất

... 
    // Function to create a new StreamingContext and set it up 
    def setupContext(): StreamingContext = { 
    ... 
    // Create a StreamingContext 
    val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds)) 

    // Create a Kinesis DStream 
    val data = KinesisUtils.createStream(ssc, 
     kinesisAppName, kinesisStreamName, 
     kinesisEndpointUrl, RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName(), 
     InitialPositionInStream.LATEST, Seconds(kinesisCheckpointIntervalSeconds), 
     StorageLevel.MEMORY_AND_DISK_SER_2, awsAccessKeyId, awsSecretKey) 
... 
    ssc.checkpoint(checkpointDir) 

    ssc 
    } 


    // Get or create a streaming context. 
    val ssc = StreamingContext.getActiveOrCreate(checkpointDir, setupContext) 

    ssc.start() 
    ssc.awaitTermination() 
+3

mô tả hay 1 –

+0

intersting. Bạn đã thử giảm kích thước hàng loạt chưa? Làm thế nào để bạn bảo vệ chống lại một nhà sản xuất thử lại? –

+0

Điều đó sẽ hữu ích nếu chúng tôi không có một số ràng buộc đầu ra: Chúng tôi đang viết thư cho S3, các RDD được xử lý khác nhau. Với nhiều công nhân, chế biến và giảm thời gian đầu ra, nhưng cũng làm tăng chi phí. Chơi với tỷ lệ phân vùng trên mỗi công nhân, giúp. Điều chỉnh spark.streaming.blockInterval giúp, vì nó cho phép bạn kiểm soát số lượng phân vùng gián tiếp, mà không gây ra sự cải tổ mà bạn nhận được với phân vùng() ở mức DStream. coalesce(), ở cấp độ RDD, cũng giúp ích. Bạn có ý nghĩa gì với "bảo vệ chống lại sự thử lại của nhà sản xuất"? Thực thi chính xác ngữ nghĩa giao hàng một lần trên đầu ra để ngăn chặn các bản sao? – MiguelPeralvo

Trả lời

4

Theo đề xuất của zero323 về việc đăng nhận xét của tôi làm câu trả lời:

Tăng spark.streaming.driver.writeAheadLog.batchingTimeout giải quyết vấn đề thời gian chờ kiểm tra. Chúng tôi đã làm nó sau khi chắc chắn rằng chúng tôi đã có chỗ cho nó. Chúng tôi đã thử nghiệm nó một lúc rồi. Vì vậy, tôi chỉ khuyên bạn nên tăng nó sau khi xem xét cẩn thận.

CHI TIẾT

Chúng tôi sử dụng các thiết lập 2 trong $ SPARK_HOME/conf/spark-defaults.conf:

spark.streaming.driver.writeAheadLog.allowBatching đúng spark.streaming.driver.writeAheadLog .batchingTimeout 15000

Ban đầu, chúng tôi chỉ có spark.streaming.driver.writeAheadLog.allowBatching được đặt thành true.

Trước khi thay đổi, chúng tôi đã sao chép vấn đề được đề cập trong câu hỏi ("... ReceivedBlockTracker: Ngoại lệ được ném khi đang ghi bản ghi ...") trong môi trường thử nghiệm. Nó xảy ra vài giờ một lần. Sau khi thay đổi, vấn đề đã biến mất. Chúng tôi chạy nó trong vài ngày trước khi chuyển sang sản xuất.

Chúng tôi đã phát hiện ra rằng lớp getBatchingTimeout() method of the WriteAheadLogUtils đã có một giá trị mặc định của 5000ms, như đã thấy ở đây:

def getBatchingTimeout(conf: SparkConf): Long = { 
    conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY, defaultValue = 5000) 
} 
Các vấn đề liên quan