Có cách nào để sử dụng giản đồ để chuyển đổi avro thư từ kafka bằng spark thành dataframe? Các tập tin schema cho hồ sơ người sử dụng:Sử dụng lược đồ để chuyển đổi tin nhắn AVRO bằng Spark thành DataFrame
{
"fields": [
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" }
],
"name": "user",
"type": "record"
}
Và đoạn mã từ SqlNetworkWordCount example và Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages để đọc trong tin nhắn.
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
...
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
case class User(firstName: String, lastName: String)
Bằng cách nào đó tôi không thể tìm cách khác để chuyển đổi thư AVRO thành DataFrame. Có khả năng sử dụng lược đồ thay thế không? Tôi đang sử dụng Spark 1.6.2
và Kafka 0.10
.
Mã hoàn chỉnh, trong trường hợp bạn quan tâm.
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
object ReadMessagesFromKafka {
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
def main(args: Array[String]) {
val brokers = "127.0.0.1:9092"
val topics = "test"
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc, kafkaParams, topicsSet)
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
'spark-avro 2.0.1' yêu cầu đường dẫn làm đầu vào và không thể xử lý mảng [Byte]. Do đó 'spark.read.avro (message._2)' ném một loại không phù hợp. –
Làm thế nào về microbatching nhiều tin nhắn và viết nó vào một thư mục/tmp/ và đọc từ nó? Nếu bạn đang sử dụng Spark 2.0, điều này sẽ hoạt động: spark.read.format ("com.databricks.spark.avro"). Schema (DataType.fromJson ("path/to/schema.json"). AsInstanceOf [StructType]) .load ("/ tmp/") .show() –