Tôi đang sử dụng Apache Spark Streaming 1.6.1 để viết một ứng dụng Java tham gia hai luồng dữ liệu Key/Value và ghi đầu ra cho HDFS. Hai luồng dữ liệu chứa các chuỗi K/V và được định kỳ nhập vào trong Spark từ HDFS bằng cách sử dụng textFileStream().Cách truyền dữ liệu qua nhiều khoảng thời gian theo đợt trong Spark Streaming
Hai luồng dữ liệu không được đồng bộ hóa, có nghĩa là một số khóa trong luồng1 tại thời điểm t0 có thể xuất hiện trong luồng2 tại thời điểm t1 hoặc ngược lại. Do đó, mục tiêu của tôi là tham gia hai luồng và tính toán các phím "còn sót lại", nên được xem xét cho hoạt động nối trong các khoảng thời gian tiếp theo.
Để làm rõ hơn này, hãy nhìn vào các thuật toán sau đây:
variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0
operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)
Tôi đã cố gắng để thực hiện thuật toán này với Spark streaming không thành công. Ban đầu, tôi tạo ra hai dòng sản phẩm nào cho các phím còn sót lại theo cách này (đây là chỉ có một dòng, nhưng mã để tạo ra dòng thứ hai là tương tự):
JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String>() {
@Override
public scala.Tuple2<String, String> call(String s) {
return new scala.Tuple2(s, s);
}
});
Sau đó, dòng trống này là thống nhất (ví dụ, union()) với stream1 và cuối cùng, sau khi nối, tôi thêm các phím còn sót lại từ stream1 và cửa sổ cuộc gọi(). Điều tương tự cũng xảy ra với luồng2.
Vấn đề là các thao tác tạo left_keys_s1 và left_keys_s2 là các phép biến đổi không có hành động, có nghĩa là Spark không tạo ra bất kỳ biểu đồ luồng RDD nào và do đó chúng không bao giờ được thực hiện. Những gì tôi nhận được ngay bây giờ là một kết nối mà chỉ xuất ra các bản ghi có khóa trong stream1 và stream2 trong cùng một khoảng thời gian.
Các bạn có đề xuất nào để triển khai chính xác điều này với Spark không?
Cảm ơn, Marco