2015-04-20 13 views
7

Tôi đang viết một công việc về tia lửa bằng cách sử dụng python. Tuy nhiên, tôi cần phải đọc trong một loạt các tập tin avro.Cách đọc tệp Avro trong PySpark

This là giải pháp gần nhất mà tôi đã tìm thấy trong thư mục mẫu của Spark. Tuy nhiên, bạn cần phải gửi kịch bản python này bằng cách sử dụng tia lửa gửi. Trong dòng lệnh của spark-submit, bạn có thể chỉ định lớp trình điều khiển, trong trường hợp đó, tất cả lớp avrokey, avrovalue của bạn sẽ được định vị.

avro_rdd = sc.newAPIHadoopFile(
     path, 
     "org.apache.avro.mapreduce.AvroKeyInputFormat", 
     "org.apache.avro.mapred.AvroKey", 
     "org.apache.hadoop.io.NullWritable", 
     keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter", 
     conf=conf) 

Trong trường hợp của tôi, tôi cần phải chạy tất cả mọi thứ trong kịch bản Python, tôi đã cố gắng để tạo ra một biến môi trường bao gồm các tập tin jar, ngón tay chéo Python sẽ thêm jar vào con đường nhưng rõ ràng nó không phải là , nó cho tôi lỗi không mong muốn.

os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar" 

Mọi người có thể giúp tôi đọc tệp avro trong một tập lệnh python không?

Trả lời

3

Bạn có thể sử dụng thư viện spark-avro. Đầu tiên cho phép tạo ra một ví dụ dữ liệu:

import avro.schema 
from avro.datafile import DataFileReader, DataFileWriter 

schema_string ='''{"namespace": "example.avro", 
"type": "record", 
"name": "KeyValue", 
"fields": [ 
    {"name": "key", "type": "string"}, 
    {"name": "value", "type": ["int", "null"]} 
] 
}''' 

schema = avro.schema.parse(schema_string) 

with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt: 
    wrt.append({"key": "foo", "value": -1}) 
    wrt.append({"key": "bar", "value": 1}) 

Đọc nó bằng cách sử spark-csv cũng đơn giản như thế này:

df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro") 
df.show() 

## +---+-----+ 
## |key|value| 
## +---+-----+ 
## |foo| -1| 
## |bar| 1| 
## +---+-----+ 
1

Các giải pháp trước đây đòi hỏi phải cài đặt một sự phụ thuộc Java của bên thứ ba, mà không phải là một cái gì đó hầu hết Python devs hài lòng với. Nhưng bạn không thực sự cần một thư viện bên ngoài nếu tất cả những gì bạn muốn làm là phân tích các tệp Avro của bạn với một lược đồ đã cho. Bạn có thể chỉ cần đọc các tập tin nhị phân và phân tích chúng với gói pyro python yêu thích của bạn.

Ví dụ, đây là cách bạn có thể tải file Avro sử dụng fastavro:

from io import BytesIO 
import fastavro 

schema = { 
    ... 
} 

rdd = sc.binaryFiles("/path/to/dataset/*.avro")\ 
    .flatMap(lambda args: fastavro.reader(BytesIO(args[1]), reader_schema=schema)) 

print(rdd.collect())