2015-09-15 15 views
5

Tôi mới sử dụng Spark và muốn biết liệu có các tùy chọn nào khác ngoài những tùy chọn dưới đây để đọc dữ liệu được lưu trữ trong hdfs từ RStudio sử dụng SparkR hoặc nếu tôi sử dụng chúng một cách chính xác. Dữ liệu có thể là bất kỳ loại nào (văn bản thuần túy, csv, json, xml hoặc bất kỳ cơ sở dữ liệu nào chứa các bảng quan hệ) và có kích thước bất kỳ (1kb - vài gb).Tùy chọn đọc các tệp lớn (thuần văn bản, xml, json, csv) từ hdfs trong RStudio với SparkR 1.5

Tôi biết rằng textFile (sc, đường dẫn) không nên được sử dụng nữa, nhưng có khả năng nào khác để đọc các loại dữ liệu như vậy ngoài hàm read.df không?

Mã sau đây sử dụng read.df và jsonFile nhưng jsonFile tạo ra một lỗi:

Sys.setenv(SPARK_HOME = "C:\\Users\\--\\Downloads\\spark-1.5.0-bin-hadoop2.6") 
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
#load the Sparkr library 
library(SparkR) 

# Create a spark context and a SQL context 
sc <- sparkR.init(master="local", sparkPackages="com.databricks:spark-csv_2.11:1.0.3") 
sqlContext <- sparkRSQL.init(sc) 

#create a sparkR DataFrame 
df <- read.df(sqlContext, "hdfs://0.0.0.0:19000/people.json", source = "json") 
df <- jsonFile(sqlContext, "hdfs://0.0.0.0:19000/people.json") 

read.df làm việc cho json, nhưng làm thế nào để đọc văn bản như thông điệp log được chỉ cách nhau bởi một dòng mới? Ví dụ.

> df <- read.df(sqlContext, "hdfs://0.0.0.0:19000/README.txt", "text") 
    Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
    java.lang.ClassNotFoundException: Failed to load class for data source: text. 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) 
    at org.apache.spark.sql.api.r.SQLUtils$.loadDF(SQLUtils.scala:156) 
    at org.apache.spark.sql.api.r.SQLUtils.loadDF(SQLUtils.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132) 
    at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79) 
    at org.apache.spark.ap 

Các lỗi với jsonFile là:

> df <- jsonFile(sqlContext, "hdfs://0.0.0.0:19000/people.json") 
    Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
    java.io.IOException: No input paths specified in job 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD$$anonfu 

Tôi không biết tại sao read.df ném một lỗi vì tôi không restard SparkR hoặc gọi SparkR.stop()

Đối cùng một mã bên cạnh việc sử dụng read.df tôi sử dụng hàm SparkR ::: textFile và sc thay vì sqlContext (theo Giới thiệu lỗi thời trên amplab).

Các thông báo lỗi là:

data <- SparkR:::textFile(sc, "hdfs://0.0.0.0:19000/people.json") 
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
    java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 5: hdfs: 
    at org.apache.hadoop.fs.Path.initialize(Path.java:206) 
    at org.apache.hadoop.fs.Path.<init>(Path.java:172) 
    at org.apache.hadoop.fs.Path.<init>(Path.java:94) 
    at org.apache.hadoop.fs.Globber.glob(Globber.java:211) 
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644) 
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257) 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at or 

Lỗi này trông như con đường không đúng, nhưng tôi không biết tại sao.

Những gì tôi hiện đang sử dụng:

spark-1.5.0-bin-hadoop2.6 hadoop-2.6.0 Windows (8.1) R Version 3.2.2 Rstudio Version 0.99.484

Tôi hy vọng ai đó có thể cho tôi một số gợi ý về vấn đề này ở đây.

Trả lời

1

Hãy thử

% hadoop fs -put people.json/
    % sparkR 
    > people <- read.df(sqlContext, "/people.json", "json") 
    > head(people) 
0

Bạn có lẽ cần một thư viện để phân tích các file khác, như DataBricks thư viện CSV:

https://github.com/databricks/spark-csv

Sau đó, bạn sẽ bắt đầu R với gói nạp, ví dụ:

$ sparkR --packages com.databricks:spark-csv_2.10:1.0.3

và tải tập tin của bạn như:

> df <- read.df(sqlContext, "cars.csv", source = "com.databricks.spark.csv", inferSchema = "true")

này giả định bạn có "cars.csv" tập tin kiểm tra trong thư mục HDFS nhà của bạn.

hth

+0

Anh ấy sử dụng tính năng này trong tia khởi động- Và anh ấy đang nói về việc khởi chạy từ RStudio thay vì từ dòng lệnh. –

Các vấn đề liên quan