2017-08-10 18 views
6

Tôi đang cố gắng tổng hợp luồng với hai cửa sổ khác nhau và in nó vào bảng điều khiển. Tuy nhiên chỉ có truy vấn trực tuyến đầu tiên đang được in. tenSecsQ không được in trên bảng điều khiển.Thực hiện các truy vấn trực tuyến riêng biệt trong phát trực tuyến có cấu trúc tia lửa

SparkSession spark = SparkSession 
    .builder() 
    .appName("JavaStructuredNetworkWordCountWindowed") 
    .config("spark.master", "local[*]") 
    .getOrCreate(); 

Dataset<Row> lines = spark 
    .readStream() 
    .format("socket") 
    .option("host", host) 
    .option("port", port) 
    .option("includeTimestamp", true) 
    .load(); 

Dataset<Row> words = lines 
    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())) 
    .toDF("word", "timestamp"); 

// 5 second window 
Dataset<Row> fiveSecs = words 
    .groupBy(
     functions.window(words.col("timestamp"), "5 seconds"), 
     words.col("word") 
    ).count().orderBy("window"); 

// 10 second window 
Dataset<Row> tenSecs = words 
    .groupBy(
      functions.window(words.col("timestamp"), "10 seconds"), 
      words.col("word") 
    ).count().orderBy("window"); 

Truy vấn phát trực tuyến cho cả luồng tổng hợp 5 và 10. Đầu ra cho luồng 10s không được in. Chỉ 5 giây được in vào bảng điều khiển

// Start writeStream() for 5s window 
StreamingQuery fiveSecQ = fiveSecs.writeStream() 
    .queryName("5_secs") 
    .outputMode("complete") 
    .format("console") 
    .option("truncate", "false") 
    .start(); 

// Start writeStream() for 10s window 
StreamingQuery tenSecsQ = tenSecs.writeStream() 
    .queryName("10_secs") 
    .outputMode("complete") 
    .format("console") 
    .option("truncate", "false") 
    .start(); 

tenSecsQ.awaitTermination(); 
+0

Thực ra, tôi không biết cách hoạt động của luồng ổ cắm nhưng với tôi dường như luồng Spark đầu tiên của bạn đọc tất cả dữ liệu từ luồng ổ cắm và không có gì còn lại cho luồng thứ hai. –

Trả lời

5

Tôi đã điều tra câu hỏi này.

Tóm tắt: Mỗi truy vấn trong Streaming có cấu trúc tiêu thụ dữ liệu source. Nguồn socket tạo ra một kết nối mới cho mỗi truy vấn được xác định. Hành vi được thấy trong trường hợp này là do nc chỉ phân phối dữ liệu đầu vào cho kết nối đầu tiên.

Do đó, không thể xác định nhiều tập hợp trên kết nối socket trừ khi chúng tôi có thể đảm bảo rằng nguồn socket được kết nối cung cấp cùng một dữ liệu cho mỗi kết nối mở.


Tôi đã thảo luận câu hỏi này trên danh sách gửi thư Spark. Nhà phát triển Databricks Shixiong Zhu đã trả lời:

Spark tạo một kết nối cho mỗi truy vấn. Hành vi bạn quan sát là vì cách "nc -lk" hoạt động. Nếu bạn sử dụng netstat để kiểm tra kết nối tcp, bạn sẽ thấy có hai kết nối khi bắt đầu hai truy vấn. Tuy nhiên, "nc" chuyển tiếp đầu vào chỉ cho một kết nối.

tôi xác nhận hành vi này bằng cách định nghĩa một thí nghiệm nhỏ: Trước tiên, tôi đã tạo ra một SimpleTCPWordServer mang lại từ ngẫu nhiên để mỗi kết nối mở và một công việc streaming cấu trúc cơ bản mà tuyên bố hai truy vấn. Sự khác biệt duy nhất giữa họ là các truy vấn thứ 2 định nghĩa thêm một cột liên tục để phân biệt sản lượng của nó:

val lines = spark 
    .readStream 
    .format("socket") 
    .option("host", "localhost") 
    .option("port", "9999") 
    .option("includeTimestamp", true) 
    .load() 

val q1 = lines.writeStream 
    .outputMode("append") 
    .format("console") 
    .trigger(Trigger.ProcessingTime("5 seconds")) 
    .start() 

val q2 = lines.withColumn("foo", lit("foo")).writeStream 
    .outputMode("append") 
    .format("console") 
    .trigger(Trigger.ProcessingTime("7 seconds")) 
    .start() 

Nếu StructuredStreaming sẽ tiêu thụ chỉ có một dòng, sau đó chúng ta sẽ thấy những lời tương tự cung cấp bởi cả hai truy vấn. Trong trường hợp mỗi truy vấn tiêu thụ một luồng riêng biệt, thì chúng tôi sẽ có các từ khác nhau được báo cáo bởi mỗi truy vấn.

Đây là đầu ra quan sát:

------------------------------------------- 
Batch: 0 
------------------------------------------- 
+--------+-------------------+ 
| value|   timestamp| 
+--------+-------------------+ 
|champion|2017-08-14 13:54:51| 
+--------+-------------------+ 

+------+-------------------+---+ 
| value|   timestamp|foo| 
+------+-------------------+---+ 
|belong|2017-08-14 13:54:51|foo| 
+------+-------------------+---+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+-------+-------------------+---+ 
| value|   timestamp|foo| 
+-------+-------------------+---+ 
| agenda|2017-08-14 13:54:52|foo| 
|ceiling|2017-08-14 13:54:52|foo| 
| bear|2017-08-14 13:54:53|foo| 
+-------+-------------------+---+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+----------+-------------------+ 
|  value|   timestamp| 
+----------+-------------------+ 
| breath|2017-08-14 13:54:52| 
|anticipate|2017-08-14 13:54:52| 
| amazing|2017-08-14 13:54:52| 
| bottle|2017-08-14 13:54:53| 
| calculate|2017-08-14 13:54:53| 
|  asset|2017-08-14 13:54:54| 
|  cell|2017-08-14 13:54:54| 
+----------+-------------------+ 

Chúng ta có thể thấy rõ rằng các dòng cho mỗi truy vấn khác nhau. Có vẻ như không thể xác định nhiều tập hợp trên dữ liệu được cung cấp bởi socket source trừ khi chúng tôi có thể đảm bảo rằng máy chủ phụ trợ TCP cung cấp chính xác cùng một dữ liệu cho từng kết nối mở.

Các vấn đề liên quan