2014-10-23 25 views
8

Bối cảnh: Tôi đang sử dụng Apache Spark để tổng hợp số lượng sự kiện khác nhau từ nhật ký. Các bản ghi được lưu trữ trong cả Cassandra cho mục đích phân tích lịch sử và Kafka cho mục đích phân tích thời gian thực. Mỗi nhật ký có ngày và loại sự kiện. Vì mục đích đơn giản, giả sử tôi muốn theo dõi số lượng nhật ký của một loại duy nhất cho mỗi ngày.Kết hợp các kết quả từ hàng loạt RDD với streaming RDD trong Apache Spark

Chúng tôi có hai RDD, RDD của dữ liệu lô từ Cassandra và một RDD khác từ Kafka. Mã giả:

CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type"); 

JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() { 
    @Override 
    public Tuple2<String, Integer> call(CassandraRow row) { 
     return new Tuple2<String, Integer>(row.getString("date"), 1); 
    } 
}).reduceByKey(new Function2<Integer, Integer, Integer>() { 
    @Override 
    public Integer call(Integer count1, Integer count2) { 
     return count1 + count2; 
    } 
}); 

save(batchRDD) // Assume this saves the batch RDD somewhere 

... 

// Assume we read a chunk of logs from the Kafka stream every x seconds. 
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...); 
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() { 
    @Override 
    public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) { 
     String jsonString = data._2; 
     JSON jsonObj = JSON.parse(jsonString); 
     Date eventDate = ... // get date from json object 
     // Assume startTime is broadcast variable that is set to the time when the job started. 
     if (eventDate.after(startTime.value())) { 
      ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>(); 
      pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1)); 
      return pairs; 
     } else { 
      return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs 
     } 
    } 
}).reduceByKey(new Function2<Integer, Integer, Integer>() { 
    @Override 
    public Integer call(Integer count1, Integer count2) { 
     return count1 + count2; 
    } 
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() { 
    @Override 
    public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) { 
     Integer previousValue = state.or(0l); 
     Integer currentValue = ... // Sum of counts 
     return Optional.of(previousValue + currentValue); 
    } 
}); 
save(streamRDD); // Assume this saves the stream RDD somewhere 

sc.start(); 
sc.awaitTermination(); 

Câu hỏi: Làm thế nào để kết hợp các kết quả từ streamRDD với batchRDD? Hãy nói rằng batchRDD có các dữ liệu sau và công việc này được chạy trên 2014/10/16:

("2014-10-15", 1000000) 
("2014-10-16", 2000000) 

Kể từ khi truy vấn Cassandra chỉ bao gồm tất cả các dữ liệu lên đến thời gian bắt đầu của truy vấn hàng loạt, chúng ta phải đọc từ Kafka khi truy vấn kết thúc, chỉ xem xét nhật ký sau thời gian bắt đầu của công việc. Chúng tôi giả định rằng truy vấn mất một thời gian dài. Điều này có nghĩa là tôi cần kết hợp các kết quả lịch sử với kết quả phát trực tuyến.

Để minh hoạ:

|------------------------|-------------|--------------|---------> 
tBatchStart    tStreamStart streamBatch1 streamBatch2 

Sau đó, giả sử rằng trong hàng loạt dòng đầu tiên chúng tôi đã nhận dữ liệu này:

("2014-10-19", 1000) 

Sau đó, tôi muốn kết hợp lô RDD với dòng RDD này để dòng RDD hiện có giá trị:

("2014-10-19", 2001000) 

Sau đó giả sử rằng trong luồng thứ hai e có dữ liệu này:

("2014-10-19", 4000) 

Sau đó dòng RDD nên được cập nhật để có giá trị:

("2014-10-19", 2005000) 

Và vân vân ...

Có thể sử dụng streamRDD.transformToPair(...) để kết hợp các streamRDD dữ liệu với dữ liệu batchRDD sử dụng join, nhưng nếu chúng tôi thực hiện điều này cho từng đoạn luồng, thì chúng tôi sẽ thêm số đếm từ batchRDD cho mỗi đoạn luồng làm cho giá trị trạng thái "gấp đôi", khi chỉ được thêm vào đoạn luồng đầu tiên.

Trả lời

4

Để giải quyết trường hợp này, tôi muốn công đoàn cơ sở RDD với kết quả của tổng hợp StateDStream mà giữ tổng số của dữ liệu trực tuyến. Điều này có hiệu quả cung cấp đường cơ sở cho dữ liệu được báo cáo trên mọi khoảng thời gian phát trực tuyến, mà không tính số lần đường cơ sở x lần.

Tôi đã thử ý tưởng đó bằng cách sử dụng mẫu WordCount và nó hoạt động.Thả tùy chọn này trên REPL cho một ví dụ sống:

(sử dụng nc -lk 9876 trên vỏ riêng biệt để cung cấp đầu vào cho các socketTextStream)

import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.storage.StorageLevel 

@transient val defaults = List("magic" -> 2, "face" -> 5, "dust" -> 7) 
val defaultRdd = sc.parallelize(defaults) 

@transient val ssc = new StreamingContext(sc, Seconds(10)) 
ssc.checkpoint("/tmp/spark") 

val lines = ssc.socketTextStream("localhost", 9876, StorageLevel.MEMORY_AND_DISK_SER) 
val words = lines.flatMap(_.split(" ")) 
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _) 
val historicCount = wordCount.updateStateByKey[Int]{(newValues: Seq[Int], runningCount: Option[Int]) => 
    Some(newValues.sum + runningCount.getOrElse(0)) 
} 
val runningTotal = historicCount.transform{ rdd => rdd.union(defaultRdd)}.reduceByKey(_+_) 

wordCount.print() 
historicCount.print() 
runningTotal.print() 
ssc.start() 
+1

Cảm ơn. Tôi chỉ muốn thêm rằng thay vì sử dụng 'rdd.union (defaultRdd)' trong biến đổi tôi đã kết thúc bằng cách sử dụng 'rdd.leftOuterJoin (defaultRdd)' chỉ để 'runningTotal' không bao gồm các cặp không bị thay đổi. Sau đó, tôi chỉ cần lưu các cặp nơi giá trị của chúng đã thay đổi. – Bobby

0

Bạn có thể cung cấp cho updateStateByKey thử:

def main(args: Array[String]) { 

    val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
     val currentCount = values.foldLeft(0)(_ + _) 
     val previousCount = state.getOrElse(0) 
     Some(currentCount + previousCount) 
    } 

    // stream 
    val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(1)) 
    ssc.checkpoint(".") 
    val lines = ssc.socketTextStream("127.0.0.1", 9999) 
    val words = lines.flatMap(_.split(" ")) 
    val pairs = words.map(word => (word, 1)) 
    val stateWordCounts = pairs.updateStateByKey[Int](updateFunc) 
    stateWordCounts.print() 
    ssc.start() 
    ssc.awaitTermination() 
} 
+0

Tôi đã sử dụng nó. Vấn đề là nếu giá trị trạng thái tùy chọn là null thì tôi phải mặc định cho một giá trị. Lý tưởng nhất là giá trị này được tính từ lô RDD. Vấn đề là 'updateStateByKey()' không truyền vào khóa, vì vậy tôi không thể tra cứu giá trị được tính từ gói RDD. – Bobby

Các vấn đề liên quan