2015-10-12 17 views
13

Kịch bản: Thông tin nhập của tôi sẽ là nhiều XML nhỏ và được cho là đọc các XML này dưới dạng RDD. Thực hiện kết hợp với một tập dữ liệu khác và tạo một RDD và gửi đầu ra dưới dạng XML.Xử lý Xml trong Spark

Có thể đọc XML bằng tia lửa, tải dữ liệu dưới dạng RDD không? Nếu có thể đọc XML như thế nào.

mẫu XML:

<root> 
    <users> 
     <user> 
       <account>1234<\account> 
       <name>name_1<\name> 
       <number>34233<\number> 
     <\user> 
     <user> 
       <account>58789<\account> 
       <name>name_2<\name> 
       <number>54697<\number> 
     <\user>  
    <\users> 
<\root> 

Làm thế nào sẽ này được nạp vào RDD?

+7

BTW, XML của bạn không phải là XML.Bạn cần phải thay thế tất cả '\\' thành '/' –

+0

Xin chào Pavani! Tôi bắt đầu với bài tập này trên Spark, và tôi muốn biết các giải pháp để nâng cao hơn trên lớp, bạn có thể giúp tôi không? –

Trả lời

15

Có thể, nhưng chi tiết sẽ khác nhau tùy thuộc vào phương pháp bạn thực hiện.

  • Nếu tệp nhỏ, như bạn đã đề cập, giải pháp đơn giản nhất là tải dữ liệu của bạn bằng cách sử dụng SparkContext.wholeTextFiles. Nó tải dữ liệu là RDD[(String, String)] trong đó phần tử đầu tiên là đường dẫn và nội dung tệp thứ hai. Sau đó, bạn phân tích từng tệp riêng lẻ như ở chế độ cục bộ.
  • Đối với các tệp lớn hơn, bạn có thể sử dụng Hadoop input formats.
    • Nếu cấu trúc đơn giản, bạn có thể chia tách bản ghi bằng cách sử dụng textinputformat.record.delimiter. Bạn có thể tìm thấy một ví dụ đơn giản here. Đầu vào không phải là một XML nhưng nó, bạn nên cung cấp cho quý vị và ý tưởng làm thế nào để tiến hành
    • Nếu không Mahout cung cấp XmlInputFormat
  • Cuối cùng người ta có thể đọc tập tin sử dụng SparkContext.textFile và điều chỉnh sau đó cho kỷ lục kéo dài giữa các phân vùng. Về mặt lý thuyết nó có nghĩa là một cái gì đó tương tự như tạo trượt cửa sổ hoặc partitioning records into groups of fixed size:

    • sử dụng mapPartitionsWithIndex phân vùng để xác định hồ sơ chia giữa các phân vùng, thu thập hồ sơ bị hỏng
    • sử dụng thứ hai mapPartitionsWithIndex để sửa chữa hồ sơ bị hỏng

Chỉnh sửa:

Ngoài ra còn có tương đối mới spark-xml gói cho phép bạn trích xuất các hồ sơ cụ thể bằng cách tag:

val df = sqlContext.read 
    .format("com.databricks.spark.xml") 
    .option("rowTag", "foo") 
    .load("bar.xml") 
4

Đây là cách để thực hiện nó -> Tôi đã sử dụng HadoopInputFormats để đọc dữ liệu XML trong tia lửa như được giải thích bởi zero323.

Input dữ liệu ->

<root> 
    <users> 
     <user> 
      <account>1234<\account> 
      <name>name_1<\name> 
      <number>34233<\number><\user> 
     <user> 
      <account>58789<\account> 
      <name>name_2<\name> 
      <number>54697<\number> 
     <\user> 
    <\users> 
<\root> 

Mã để đọc XML Input ->

Bạn sẽ nhận được một số lọ tại link

//---------------spark_import 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.SQLContext 

//----------------xml_loader_import 
import org.apache.hadoop.io.LongWritable 
import org.apache.hadoop.io.Text 
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{ LongWritable, Text } 
import com.cloudera.datascience.common.XmlInputFormat 

object Tester_loader { 
    case class User(account: String, name: String, number: String) 
    def main(args: Array[String]): Unit = { 

    val sparkHome = "/usr/big_data_tools/spark-1.5.0-bin-hadoop2.6/" 
    val sparkMasterUrl = "spark://SYSTEMX:7077" 

    var jars = new Array[String](3) 

    jars(0) = "/home/hduser/Offload_Data_Warehouse_Spark.jar" 
    jars(1) = "/usr/big_data_tools/JARS/Spark_jar/avro/spark-avro_2.10-2.0.1.jar" 

    val conf = new SparkConf().setAppName("XML Reading") 
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
     .setMaster("local") 
     .set("spark.cassandra.connection.host", "127.0.0.1") 
     .setSparkHome(sparkHome) 
     .set("spark.executor.memory", "512m") 
     .set("spark.default.deployCores", "12") 
     .set("spark.cores.max", "12") 
     .setJars(jars) 

    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    //---------------------------------loading user from XML 

    val pages = readFile("src/input_data", "<user>", "<\\user>", sc) //calling function 1.1 

    val xmlUserDF = pages.map { tuple => 
     { 
     val account = extractField(tuple, "account") 
     val name = extractField(tuple, "name") 
     val number = extractField(tuple, "number") 

     User(account, name, number) 
     } 
    }.toDF() 
    println(xmlUserDF.count()) 
    xmlUserDF.show() 
    } 

    //------------------------------------Functions 

    def readFile(path: String, start_tag: String, end_tag: String, sc: SparkContext) = { 
    val conf = new Configuration() 
    conf.set(XmlInputFormat.START_TAG_KEY, start_tag) 
    conf.set(XmlInputFormat.END_TAG_KEY, end_tag) 
    val rawXmls = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable], 
     classOf[Text], conf) 

    rawXmls.map(p => p._2.toString) 
    } 

    def extractField(tuple: String, tag: String) = { 
    var value = tuple.replaceAll("\n", " ").replace("<\\", "</") 

    if (value.contains("<" + tag + ">") && value.contains("</" + tag + ">")) { 

     value = value.split("<" + tag + ">")(1).split("</" + tag + ">")(0) 

    } 
    value 
    } 
} 

Output này ->

+-------+------+------+ 
|account| name|number| 
+-------+------+------+ 
| 1234|name_1| 34233| 
| 58789|name_2| 54697| 
+-------+------+------+ 

kết quả thu được là trong dataframes bạn có thể chuyển đổi chúng sang RDD theo yêu cầu của bạn như thế này->

val xmlUserRDD = xmlUserDF.toJavaRDD.rdd.map { x => (x.get(0).toString(),x.get(1).toString(),x.get(2).toString()) } 

Hãy đánh giá nó, nếu nó có thể giúp bạn một số cách.

3

Điều này sẽ giúp bạn.

package packagename; 

import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.SparkSession; 

import com.databricks.spark.xml.XmlReader; 

public class XmlreaderSpark { 
    public static void main(String arr[]){ 
    String localxml="file path"; 
    String booksFileTag = "user"; 

    String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse"; 
    System.out.println("warehouseLocation" + warehouseLocation); 
    SparkSession spark = SparkSession 
       .builder() 
       .master("local") 
       .appName("Java Spark SQL Example") 
       .config("spark.some.config.option", "some-value").config("spark.sql.warehouse.dir", warehouseLocation) 
       .enableHiveSupport().config("set spark.sql.crossJoin.enabled", "true") 
       .getOrCreate(); 
    SQLContext sqlContext = new SQLContext(spark); 

    Dataset<Row> df = (new XmlReader()).withRowTag(booksFileTag).xmlFile(sqlContext, localxml); 
    df.show(); 

    } 
} 

Bạn cần phải thêm phụ thuộc này trong pom.xml của bạn:

<dependency> 
    <groupId>com.databricks</groupId> 
    <artifactId>spark-xml_2.10</artifactId> 
    <version>0.4.0</version> 
</dependency> 

và file đầu vào của bạn là không ở dạng thích hợp.

Cảm ơn.

3

Có hai lựa chọn tốt cho những trường hợp đơn giản:

  • wholeTextFiles. Sử dụng phương pháp ánh xạ với trình phân tích cú pháp XML của bạn, có thể là trình phân tích cú pháp kéo Scala XML (mã nhanh hơn) hoặc Trình phân tích cú pháp kéo SAX (hiệu năng tốt hơn).
  • Hadoop streaming XMLInputFormat mà bạn phải xác định khi bắt đầu và kết thúc thẻ <user></user> để xử lý nó, tuy nhiên, nó tạo ra một phân vùng cho mỗi người dùng thẻ
  • spark-xml package là một lựa chọn tốt quá.

Với tất cả các tùy chọn, bạn bị giới hạn chỉ xử lý XML đơn giản có thể được hiểu là tập dữ liệu với hàng và cột.

Tuy nhiên, nếu chúng tôi phức tạp một chút, các tùy chọn đó sẽ không hữu ích.

Ví dụ, nếu bạn có một thực thể hơn đó:

<root> 
    <users> 
    <user>...</users> 
    <companies> 
    <company>...</companies> 
</root> 

Bây giờ bạn cần phải tạo ra 2 RDDs và thay đổi phân tích cú pháp của bạn để nhận thẻ <company>.

Đây chỉ là một trường hợp đơn giản, nhưng XML có thể phức tạp hơn nhiều và bạn sẽ cần bao gồm nhiều thay đổi hơn.

Để giải quyết sự phức tạp này, chúng tôi đã xây dựng Flexter trên đỉnh Apache Spark để giảm bớt nỗi đau của processing XML files on Spark. Tôi cũng khuyên bạn nên đọc về converting XML on Spark to Parquet. Bài sau cũng bao gồm một số mẫu mã cho thấy đầu ra có thể được truy vấn bằng SparkSQL như thế nào.

Tuyên bố từ chối trách nhiệm: Tôi làm việc cho Sonra

+1

Tôi khuyên bạn nên thêm tuyên bố từ chối trách nhiệm rằng bạn là người đồng sáng lập của công ty này. – Davos

Các vấn đề liên quan