2016-05-20 39 views
5

Tôi đang sử dụng Apache Spark Streaming 1.6.1 để viết một ứng dụng Java tham gia hai luồng dữ liệu Key/Value và ghi đầu ra cho HDFS. Hai luồng dữ liệu chứa các chuỗi K/V và được định kỳ nhập vào trong Spark từ HDFS bằng cách sử dụng textFileStream().Cách truyền dữ liệu qua nhiều khoảng thời gian theo đợt trong Spark Streaming

Hai luồng dữ liệu không được đồng bộ hóa, có nghĩa là một số khóa trong luồng1 tại thời điểm t0 có thể xuất hiện trong luồng2 tại thời điểm t1 hoặc ngược lại. Do đó, mục tiêu của tôi là tham gia hai luồng và tính toán các phím "còn sót lại", nên được xem xét cho hoạt động nối trong các khoảng thời gian tiếp theo.

Để làm rõ hơn này, hãy nhìn vào các thuật toán sau đây:

variables: 
stream1 = <String, String> input stream at time t1 
stream2 = <String, String> input stream at time t1 
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0 
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0 

operations at time t1: 
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2) 
write out_stream to HDFS 
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2) 
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2) 

Tôi đã cố gắng để thực hiện thuật toán này với Spark streaming không thành công. Ban đầu, tôi tạo ra hai dòng sản phẩm nào cho các phím còn sót lại theo cách này (đây là chỉ có một dòng, nhưng mã để tạo ra dòng thứ hai là tương tự):

JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context 
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>(); 
q.add(empty_rdd); 
JavaDStream<String> empty_dstream = jssc.queueStream(q); 
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String>() { 
           @Override 
           public scala.Tuple2<String, String> call(String s) { 
            return new scala.Tuple2(s, s); 
           } 
           }); 

Sau đó, dòng trống này là thống nhất (ví dụ, union()) với stream1 và cuối cùng, sau khi nối, tôi thêm các phím còn sót lại từ stream1 và cửa sổ cuộc gọi(). Điều tương tự cũng xảy ra với luồng2.

Vấn đề là các thao tác tạo left_keys_s1 và left_keys_s2 là các phép biến đổi không có hành động, có nghĩa là Spark không tạo ra bất kỳ biểu đồ luồng RDD nào và do đó chúng không bao giờ được thực hiện. Những gì tôi nhận được ngay bây giờ là một kết nối mà chỉ xuất ra các bản ghi có khóa trong stream1 và stream2 trong cùng một khoảng thời gian.

Các bạn có đề xuất nào để triển khai chính xác điều này với Spark không?

Cảm ơn, Marco

Trả lời

1

Nó nên có thể để mang sang toàn bộ giá trị từ một đợt tiếp theo bằng cách giữ một tham chiếu đến một RDD nơi chúng tôi những giá trị được tổ chức.

Đừng cố hợp nhất các luồng bằng cách sử dụng queueDStream, thay vào đó khai báo tham chiếu RDD có thể thay đổi có thể cập nhật tại mỗi khoảng thời gian phát trực tuyến.

Đây là một ví dụ:

Trong công việc trực tuyến này, chúng tôi bắt đầu với một RDD carring 100 số nguyên. Mỗi khoảng thời gian, 10 số ngẫu nhiên được tạo và trừ cho 100 số nguyên ban đầu. Quá trình này tiếp tục cho đến khi RDD ban đầu với 100 phần tử trống. Ví dụ này cho thấy làm thế nào để thực hiện trên các yếu tố từ một khoảng thời gian tiếp theo.

import scala.util.Random 
    import org.apache.spark.streaming.dstream._ 

    val ssc = new StreamingContext(sparkContext, Seconds(2)) 

    var targetInts:RDD[Int] = sc.parallelize(0 until 100) 

    var loops = 0 

    // we create an rdd of functions that generate random data. 
    // evaluating this RDD at each interval will generate new random data points. 
    val randomDataRdd = sc.parallelize(1 to 10).map(_ =>() => Random.nextInt(100)) 

    val dstream = new ConstantInputDStream(ssc, randomDataRdd) 

    // create values from the random func rdd 

    dataDStream.foreachRDD{rdd => 
         loops += 1 
         targetInts = targetInts.subtract(rdd) 
         if (targetInts.isEmpty) {println(loops); ssc.stop(false)} 
         } 


    ssc.start() 

Chạy ví dụ này và âm mưu chống lại loopstargetInts.count cho biểu đồ sau:

Removing 100 ints by generating random numbers

Tôi hy vọng điều này cung cấp cho bạn đủ hướng dẫn để thực hiện các usecase hoàn tất.

Các vấn đề liên quan