2016-08-20 18 views
12

Có cách nào để sử dụng giản đồ để chuyển đổi thư từ bằng thành ? Các tập tin schema cho hồ sơ người sử dụng:Sử dụng lược đồ để chuyển đổi tin nhắn AVRO bằng Spark thành DataFrame

{ 
    "fields": [ 
    { "name": "firstName", "type": "string" }, 
    { "name": "lastName", "type": "string" } 
    ], 
    "name": "user", 
    "type": "record" 
} 

Và đoạn mã từ SqlNetworkWordCount exampleKafka, Spark and Avro - Part 3, Producing and consuming Avro messages để đọc trong tin nhắn.

object Injection { 
    val parser = new Schema.Parser() 
    val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json")) 
    val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema) 
} 

... 

messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => { 
    val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
    import sqlContext.implicits._ 

    val df = rdd.map(message => Injection.injection.invert(message._2).get) 
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF() 

    df.show() 
}) 

case class User(firstName: String, lastName: String) 

Bằng cách nào đó tôi không thể tìm cách khác để chuyển đổi thư AVRO thành DataFrame. Có khả năng sử dụng lược đồ thay thế không? Tôi đang sử dụng Spark 1.6.2Kafka 0.10.

Mã hoàn chỉnh, trong trường hợp bạn quan tâm.

import com.twitter.bijection.Injection 
import com.twitter.bijection.avro.GenericAvroCodecs 
import kafka.serializer.{DefaultDecoder, StringDecoder} 
import org.apache.avro.Schema 
import org.apache.avro.generic.GenericRecord 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.streaming.kafka._ 
import org.apache.spark.streaming.{Seconds, StreamingContext, Time} 
import org.apache.spark.{SparkConf, SparkContext} 

object ReadMessagesFromKafka { 
    object Injection { 
    val parser = new Schema.Parser() 
    val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json")) 
    val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema) 
    } 

    def main(args: Array[String]) { 
    val brokers = "127.0.0.1:9092" 
    val topics = "test" 

    // Create context with 2 second batch interval 
    val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]") 
    val ssc = new StreamingContext(sparkConf, Seconds(2)) 

    // Create direct kafka stream with brokers and topics 
    val topicsSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
    val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
    ssc, kafkaParams, topicsSet) 

    messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => { 
     val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
     import sqlContext.implicits._ 

     val df = rdd.map(message => Injection.injection.invert(message._2).get) 
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF() 

     df.show() 
    }) 

    // Start the computation 
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

/** Case class for converting RDD to DataFrame */ 
case class User(firstName: String, lastName: String) 

/** Lazily instantiated singleton instance of SQLContext */ 
object SQLContextSingleton { 
    @transient private var instance: SQLContext = _ 

    def getInstance(sparkContext: SparkContext): SQLContext = { 
    if (instance == null) { 
     instance = new SQLContext(sparkContext) 
    } 
    instance 
    } 
} 

Trả lời

2

OP này có lẽ giải quyết vấn đề nhưng để tham khảo trong tương lai Tôi đã giải quyết vấn đề này khá thường xuyên vì vậy nghĩ rằng nó có thể hữu ích để đăng ở đây.

Vì vậy, nói chung bạn nên chuyển đổi schema Avro đến một StructType tia lửa và cũng chuyển đổi các đối tượng mà bạn có trong RDD của bạn để Row [Bất kỳ] và sau đó sử dụng:

spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType> 

Để chuyển đổi schema Avro tôi đã từng spark-avro như vậy:

SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] 

các chuyển đổi của RDD là phức tạp hơn .. nếu schema của bạn là đơn giản bạn có thể có lẽ chỉ làm một bản đồ đơn giản .. một cái gì đó như thế này:

rdd.map(obj=>{ 
    val seq = (obj.getName(),obj.getAge() 
    Row.fromSeq(seq)) 
    }) 

Trong ví dụ này đối tượng có 2 tên trường và độ tuổi.

Điều quan trọng là đảm bảo các phần tử trong Hàng sẽ khớp với thứ tự và loại trường trong StructType từ trước.

Trong trường hợp perticular của tôi, tôi đã có một đối tượng phức tạp hơn nhiều mà tôi muốn xử lý chung để hỗ trợ các thay đổi lược đồ trong tương lai, vì vậy mã của tôi phức tạp hơn nhiều.

phương pháp được đề xuất bởi OP cũng nên làm việc trên một số casese nhưng sẽ khó có thể bao hàm trên đối tượng phức tạp (không nguyên thủy hoặc trường lớp)

tip khác là nếu bạn có một lớp học trong một lớp học mà bạn nên chuyển đổi lớp để một Row để các lớp gói sẽ được chuyển đổi sang một cái gì đó như:

Row(Any,Any,Any,Row,...) 

bạn cũng có thể xem xét các dự án spark-Avro tôi đã đề cập trước đây về làm thế nào để chuyển đổi đối tượng để hàng .. tôi sử dụng một số của logic có bản thân mình

Nếu ai đó đọc điều này cần trợ giúp thêm, hãy hỏi tôi trong các nhận xét và tôi sẽ cố gắng giúp đỡ

3

Tôi đã làm việc về vấn đề tương tự, nhưng trong Java. Vì vậy, không chắc chắn về Scala, nhưng hãy xem thư viện com.databricks.spark.avro (https://github.com/databricks/spark-avro). Hy vọng nó sẽ giúp

3

Hãy dành một cái nhìn lúc này https://github.com/databricks/spark-avro/blob/master/src/test/scala/com/databricks/spark/avro/AvroSuite.scala

Vì vậy, thay vì

val df = rdd.map(message => Injection.injection.invert(message._2).get) 
.map(record => User(record.get("firstName").toString,records.get("lastName").toString)).toDF() 

bạn có thể thử

val df = spark.read.avro(message._2.get) 
+0

'spark-avro 2.0.1' yêu cầu đường dẫn làm đầu vào và không thể xử lý mảng [Byte]. Do đó 'spark.read.avro (message._2)' ném một loại không phù hợp. –

+1

Làm thế nào về microbatching nhiều tin nhắn và viết nó vào một thư mục/tmp/ và đọc từ nó? Nếu bạn đang sử dụng Spark 2.0, điều này sẽ hoạt động: spark.read.format ("com.databricks.spark.avro"). Schema (DataType.fromJson ("path/to/schema.json"). AsInstanceOf [StructType]) .load ("/ tmp/") .show() –

1

Đối với bất cứ ai quan tâm đến việc xử lý điều này theo cách có thể xử lý thay đổi lược đồ mà không cần dừng và triển khai lại ứng dụng tia lửa của bạn (giả sử logic ứng dụng có thể xử lý việc này) xem số question/answer này.

Các vấn đề liên quan