Bối cảnh: Tôi đang sử dụng Apache Spark để tổng hợp số lượng sự kiện khác nhau từ nhật ký. Các bản ghi được lưu trữ trong cả Cassandra cho mục đích phân tích lịch sử và Kafka cho mục đích phân tích thời gian thực. Mỗi nhật ký có ngày và loại sự kiện. Vì mục đích đơn giản, giả sử tôi muốn theo dõi số lượng nhật ký của một loại duy nhất cho mỗi ngày.Kết hợp các kết quả từ hàng loạt RDD với streaming RDD trong Apache Spark
Chúng tôi có hai RDD, RDD của dữ liệu lô từ Cassandra và một RDD khác từ Kafka. Mã giả:
CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");
JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
@Override
public Tuple2<String, Integer> call(CassandraRow row) {
return new Tuple2<String, Integer>(row.getString("date"), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
});
save(batchRDD) // Assume this saves the batch RDD somewhere
...
// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
String jsonString = data._2;
JSON jsonObj = JSON.parse(jsonString);
Date eventDate = ... // get date from json object
// Assume startTime is broadcast variable that is set to the time when the job started.
if (eventDate.after(startTime.value())) {
ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
return pairs;
} else {
return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
}
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
Integer previousValue = state.or(0l);
Integer currentValue = ... // Sum of counts
return Optional.of(previousValue + currentValue);
}
});
save(streamRDD); // Assume this saves the stream RDD somewhere
sc.start();
sc.awaitTermination();
Câu hỏi: Làm thế nào để kết hợp các kết quả từ streamRDD với batchRDD? Hãy nói rằng batchRDD
có các dữ liệu sau và công việc này được chạy trên 2014/10/16:
("2014-10-15", 1000000)
("2014-10-16", 2000000)
Kể từ khi truy vấn Cassandra chỉ bao gồm tất cả các dữ liệu lên đến thời gian bắt đầu của truy vấn hàng loạt, chúng ta phải đọc từ Kafka khi truy vấn kết thúc, chỉ xem xét nhật ký sau thời gian bắt đầu của công việc. Chúng tôi giả định rằng truy vấn mất một thời gian dài. Điều này có nghĩa là tôi cần kết hợp các kết quả lịch sử với kết quả phát trực tuyến.
Để minh hoạ:
|------------------------|-------------|--------------|--------->
tBatchStart tStreamStart streamBatch1 streamBatch2
Sau đó, giả sử rằng trong hàng loạt dòng đầu tiên chúng tôi đã nhận dữ liệu này:
("2014-10-19", 1000)
Sau đó, tôi muốn kết hợp lô RDD với dòng RDD này để dòng RDD hiện có giá trị:
("2014-10-19", 2001000)
Sau đó giả sử rằng trong luồng thứ hai e có dữ liệu này:
("2014-10-19", 4000)
Sau đó dòng RDD nên được cập nhật để có giá trị:
("2014-10-19", 2005000)
Và vân vân ...
Có thể sử dụng streamRDD.transformToPair(...)
để kết hợp các streamRDD dữ liệu với dữ liệu batchRDD sử dụng join
, nhưng nếu chúng tôi thực hiện điều này cho từng đoạn luồng, thì chúng tôi sẽ thêm số đếm từ batchRDD cho mỗi đoạn luồng làm cho giá trị trạng thái "gấp đôi", khi chỉ được thêm vào đoạn luồng đầu tiên.
Cảm ơn. Tôi chỉ muốn thêm rằng thay vì sử dụng 'rdd.union (defaultRdd)' trong biến đổi tôi đã kết thúc bằng cách sử dụng 'rdd.leftOuterJoin (defaultRdd)' chỉ để 'runningTotal' không bao gồm các cặp không bị thay đổi. Sau đó, tôi chỉ cần lưu các cặp nơi giá trị của chúng đã thay đổi. – Bobby