Tôi đang cố gắng tổng hợp luồng với hai cửa sổ khác nhau và in nó vào bảng điều khiển. Tuy nhiên chỉ có truy vấn trực tuyến đầu tiên đang được in. tenSecsQ
không được in trên bảng điều khiển.Thực hiện các truy vấn trực tuyến riêng biệt trong phát trực tuyến có cấu trúc tia lửa
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCountWindowed")
.config("spark.master", "local[*]")
.getOrCreate();
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load();
Dataset<Row> words = lines
.as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
.toDF("word", "timestamp");
// 5 second window
Dataset<Row> fiveSecs = words
.groupBy(
functions.window(words.col("timestamp"), "5 seconds"),
words.col("word")
).count().orderBy("window");
// 10 second window
Dataset<Row> tenSecs = words
.groupBy(
functions.window(words.col("timestamp"), "10 seconds"),
words.col("word")
).count().orderBy("window");
Truy vấn phát trực tuyến cho cả luồng tổng hợp 5 và 10. Đầu ra cho luồng 10s không được in. Chỉ 5 giây được in vào bảng điều khiển
// Start writeStream() for 5s window
StreamingQuery fiveSecQ = fiveSecs.writeStream()
.queryName("5_secs")
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();
// Start writeStream() for 10s window
StreamingQuery tenSecsQ = tenSecs.writeStream()
.queryName("10_secs")
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();
tenSecsQ.awaitTermination();
Thực ra, tôi không biết cách hoạt động của luồng ổ cắm nhưng với tôi dường như luồng Spark đầu tiên của bạn đọc tất cả dữ liệu từ luồng ổ cắm và không có gì còn lại cho luồng thứ hai. –