2016-12-02 17 views
7

Tôi có một luồng có cấu trúc được thiết lập đang chạy tốt, nhưng tôi đã hy vọng theo dõi nó trong khi nó đang chạy.Giám sát cấu trúc Streaming

tôi đã xây dựng được một EventCollector

class EventCollector extends StreamingQueryListener{ 
    override def onQueryStarted(event: QueryStartedEvent): Unit = { 
    println("Start") 
    } 

    override def onQueryProgress(event: QueryProgressEvent): Unit = { 
    println(event.queryStatus.prettyJson) 
    } 

    override def onQueryTerminated(event: QueryTerminatedEvent): Unit = { 
    println("Term") 
    } 

tôi đã xây dựng được một EventCollector và thêm người nghe đến phiên spark tôi

val listener = new EventCollector() 
spark.streams.addListener(listener) 

Sau đó, tôi bắn ra các truy vấn

val query = inputDF.writeStream 
    //.format("console") 
    .queryName("Stream") 
    .foreach(writer) 
    .start() 

query.awaitTermination() 

Tuy nhiên , onQueryProgress không bao giờ bị tấn công. onQueryStarted, nhưng tôi đã hy vọng có được tiến trình của truy vấn tại một khoảng thời gian nhất định để theo dõi các truy vấn đang hoạt động như thế nào. Bất cứ ai có thể hỗ trợ điều này?

+0

Theo dõi Hội nghị thượng đỉnh mới nhất của Spark. Chúng tôi đã học được rằng một số thông tin chúng tôi đang tìm kiếm có thể được tìm thấy trong các tập tin kiểm tra tia lửa. –

Trả lời

3

Sau nhiều nghiên cứu về chủ đề này, đây là những gì tôi đã tìm thấy ...

OnQueryProgress bị truy cập giữa các truy vấn. Tôi không chắc chắn chức năng có chủ ý này hay không, nhưng trong khi chúng tôi đang truyền dữ liệu từ một tệp, thì OnQueryProgress không kích hoạt.

Một giải pháp tôi đã tìm thấy là dựa vào bồn rửa nhà văn foreach và thực hiện phân tích hiệu suất của riêng tôi trong chức năng quy trình. Thật không may, chúng tôi không thể truy cập thông tin cụ thể về truy vấn đang chạy. Hoặc, tôi đã không tìm ra cách nào. Đây là những gì tôi đã thực hiện trong sandbox của tôi để phân tích hiệu suất:

val writer = new ForeachWriter[rawDataRow] { 
    def open(partitionId: Long, version: Long):Boolean = { 
     //We end up here in between files 
     true 
    } 
    def process(value: rawDataRow) = { 
     counter += 1 

     if(counter % 1000 == 0) { 
      val currentTime = System.nanoTime() 
      val elapsedTime = (currentTime - startTime)/1000000000.0 

      println(s"Records Written: $counter") 
      println(s"Time Elapsed: $elapsedTime seconds") 
     } 
    } 
} 

Một cách khác để có được số liệu:

Một cách khác để có được thông tin về các truy vấn chạy là để nhấn endpoint GET tia lửa mà Cung cấp cho chúng tôi.

http://localhost:4040/metrics

hoặc

http://localhost:4040/api/v1/

Tài liệu ở đây: http://spark.apache.org/docs/latest/monitoring.html

Cập nhật Số 02 Tháng Chín 2017: Thử nghiệm trên tia lửa thường xuyên trực tuyến, không có cấu trúc luồng

Tuyên bố từ chối trách nhiệm, điều này có thể không áp dụng cho phát trực tuyến có cấu trúc, tôi cần thiết lập giường thử nghiệm để xác nhận. Tuy nhiên, nó hoạt động với phát tia lửa thông thường (Tiêu thụ từ Kafka trong ví dụ này).

Tôi tin rằng, kể từ khi phát trực tiếp tia lửa 2.2 đã được phát hành, các thiết bị đầu cuối mới tồn tại có thể truy xuất nhiều chỉ số hơn về hiệu suất của luồng. Điều này có thể đã tồn tại trong các phiên bản trước và tôi đã bỏ lỡ nó, nhưng tôi muốn đảm bảo rằng nó đã được ghi lại cho bất kỳ ai khác đang tìm kiếm thông tin này.

http://localhost:4040/api/v1/applications/ {applicationIdHere}/streaming/thống kê

Đây là thiết bị đầu cuối mà có vẻ như nó đã được bổ sung trong 2.2 (Hoặc nó đã tồn tại và được chỉ cần thêm các tài liệu hướng dẫn, tôi không chắc chắn, tôi có không đã kiểm tra).

Anyways, nó bổ sung thêm số liệu ở định dạng này cho các ứng dụng trực tuyến quy định:

{ 
    "startTime" : "2017-09-13T14:02:28.883GMT", 
    "batchDuration" : 1000, 
    "numReceivers" : 0, 
    "numActiveReceivers" : 0, 
    "numInactiveReceivers" : 0, 
    "numTotalCompletedBatches" : 90379, 
    "numRetainedCompletedBatches" : 1000, 
    "numActiveBatches" : 0, 
    "numProcessedRecords" : 39652167, 
    "numReceivedRecords" : 39652167, 
    "avgInputRate" : 771.722, 
    "avgSchedulingDelay" : 2, 
    "avgProcessingTime" : 85, 
    "avgTotalDelay" : 87 
} 

này cho chúng ta khả năng để xây dựng các ứng dụng tùy chỉnh số liệu/giám sát riêng của chúng tôi bằng cách sử dụng thiết bị đầu cuối REST của được tiếp xúc bởi Spark.

+0

Ý của bạn là gì khi "OnQueryProgress bị tấn công giữa các truy vấn". FYI, trong phiên bản Spark 2.1 sắp tới, các sự kiện này sẽ được đăng khi có một đợt chạy và cũng được đăng 10 giây một lần ngay cả khi không có dữ liệu mới đến. – zsxwing

+0

Xin lỗi, ý tôi là chức năng OnQueryProgress được gọi giữa các truy vấn. Những gì chúng tôi muốn là cho OnQueryProgress được gọi trong khi truy vấn đang được tiến hành. Bằng cách đó chúng ta có thể mổ xẻ nó như thế nào. Điều đó vẫn có thể xảy ra, nhưng tôi vẫn chưa tìm ra. –

+1

Tôi hiểu. Khi có một đợt chạy lớn, có thể mất nhiều thời gian để hoàn thành. FileStreamSource có một tùy chọn 'maxFilesPerTrigger' để giới hạn số lượng tệp để xử lý trong mỗi lô. Bạn có thể sử dụng nó để tạo ra các lô nhỏ thay vì một lô lớn. – zsxwing