2015-03-10 28 views
6

Tôi đã đọc về Spark 's hỗ trợ cho gzip file đầu vào -kind here, và tôi tự hỏi, nếu sự hỗ trợ cùng tồn tại cho các loại hình khác nhau của các file nén như .zip file . Cho đến nay tôi đã thử tính toán một tệp nén dưới dạng tệp zip, nhưng Spark dường như không thể đọc nội dung của tệp thành công.Hỗ trợ nén trong Apache Spark

Tôi đã xem xét Hadoop 's newAPIHadoopFilenewAPIHadoopRDD, nhưng cho đến nay tôi chưa thể làm được gì.

Bên cạnh đó, Spark hỗ trợ việc tạo ra một phân vùng cho mỗi tập tin dưới một thư mục chỉ định, như trong ví dụ dưới đây:

SparkConf SpkCnf = new SparkConf().setAppName("SparkApp") 
            .setMaster("local[4]"); 

JavaSparkContext Ctx = new JavaSparkContext(SpkCnf); 

JavaRDD<String> FirstRDD = Ctx.textFile("C:\input\).cache(); 

đâu C:\input\ điểm vào một thư mục với nhiều file.

Trong trường hợp máy tính nén file sẽ có thể, nó sẽ cũng có thể để đóng gói tất cả các tập tin trong một file nén duy nhất và làm theo cùng một khuôn mẫu của một phân vùng cho mỗi tập tin?

Trả lời

7

Vì Apache Spark sử dụng định dạng Đầu vào Hadoop, chúng ta có thể xem tài liệu về cách xử lý tệp zip và xem có điều gì đó hoạt động không.

This site cung cấp cho chúng tôi ý tưởng về cách sử dụng điều này (cụ thể là chúng tôi có thể sử dụng ZipFileInputFormat). Điều đó đang được nói, vì các tệp zip không phải là bảng phân chia (xem this) yêu cầu của bạn để có một tệp nén duy nhất không thực sự được hỗ trợ tốt. Thay vào đó, nếu có thể, sẽ tốt hơn nếu có một thư mục chứa nhiều tệp zip riêng biệt.

Câu hỏi này tương tự như this other question, tuy nhiên, câu hỏi bổ sung là có thể có một tệp zip (vì đó không phải là định dạng bảng phân tách).

2

Bạn có thể sử dụng sc.binaryFiles để đọc Zip như tập tin nhị phân

val rdd = sc.binaryFiles(path).flatMap { case (name: String, content: PortableDataStream) => new ZipInputStream(content.open) } //=> RDD[ZipInputStream]

Và sau đó bạn có thể lập bản đồ ZipInputStream vào danh sách của dòng:

val zis = rdd.first val entry = zis.getNextEntry val br = new BufferedReader(new InputStreamReader(in, "UTF-8")) val res = Stream.continually(br.readLine()).takeWhile(_ != null).toList

Nhưng vấn đề vẫn còn tệp zip không thể chia nhỏ được.

+1

Tôi có một vấn đề tương tự. Tôi đã thử điều này nhưng nhận được lỗi. bạn có thể giúp gì không? val zipFileRDD = sc.binaryFiles (zipFile) .flatMap {trường hợp (tên: Chuỗi, nội dung: PortableDataStream) => new ZipInputStream (nội dung.mở)} > >: 95: lỗi: loại không phù hợp; > được tìm thấy: java.util.zip.ZipInputStream > bắt buộc: TraversableOnce [?] > val zipFileRDD = sc.binaryFiles (zipFile) .flatMap {case (name, content) => new ZipInputStream (content.open)} – Pooja3101

1

Bạn có thể sử dụng sc.binaryFiles để mở tệp nén ở định dạng nhị phân, sau đó giải nén tệp đó thành định dạng văn bản. Thật không may, các tập tin zip không phải là chia-thể .. Vì vậy, bạn cần phải chờ cho giải nén, sau đó có thể gọi shuffle để cân bằng dữ liệu trong mỗi phân vùng.

Đây là một ví dụ trong Python. Thông tin khác là trong http://gregwiki.duckdns.org/index.php/2016/04/11/read-zip-file-in-spark/

file_RDD = sc.binaryFiles(HDFS_path + data_path) 

def Zip_open(binary_stream_string) : # New version, treat a stream as zipped file 
    try : 
     pseudo_file = io.BytesIO(binary_stream_string) 
     zf = zipfile.ZipFile(pseudo_file) 
     return zf 
    except : 
     return None 

def read_zip_lines(zipfile_object) : 
    file_iter = zipfile_object.open('diff.txt') 
    data = file_iter.readlines() 
    return data 

My_RDD = file_RDD.map(lambda kv: (kv[0], Zip_open(kv[1]))) 
0

Dưới đây là ví dụ tìm kiếm thư mục.tệp zip và tạo RDD bằng cách sử dụng FileInputFormat tùy chỉnh được gọi là ZipFileInputFormat và API newAPIHadoopFile trên Ngữ cảnh Spark. Sau đó nó ghi các tệp đó vào một thư mục đầu ra.

allzip.foreach { x => 
    val zipFileRDD = sc.newAPIHadoopFile(
    x.getPath.toString, 
    classOf[ZipFileInputFormat], 
    classOf[Text], 
    classOf[BytesWritable], hadoopConf) 

    zipFileRDD.foreach { y => 
    ProcessFile(y._1.toString, y._2) 
    } 

https://github.com/alvinhenrick/apache-spark-examples/blob/master/src/main/scala/com/zip/example/Unzip.scala

Các ZipFileInputFormat sử dụng trong ví dụ này có thể được tìm thấy ở đây: https://github.com/cotdp/com-cotdp-hadoop/tree/master/src/main/java/com/cotdp/hadoop

2

Spark hỗ trợ mặc định file nén

Theo Spark Programming Guide

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").

này có thể được mở rộng bằng cách cung cấp thông tin về những gì các định dạng nén được hỗ trợ bởi Hadoop, mà về cơ bản có thể được kiểm tra bằng cách tìm tất cả các lớp học mở rộng CompressionCodec (docs)

name | ext  | codec class 
------------------------------------------------------------- 
bzip2 | .bz2  | org.apache.hadoop.io.compress.BZip2Codec 
default | .deflate | org.apache.hadoop.io.compress.DefaultCodec 
deflate | .deflate | org.apache.hadoop.io.compress.DeflateCodec 
gzip | .gz  | org.apache.hadoop.io.compress.GzipCodec 
lz4  | .lz4  | org.apache.hadoop.io.compress.Lz4Codec 
snappy | .snappy | org.apache.hadoop.io.compress.SnappyCodec 

Nguồn: List the available hadoop codecs

Vì vậy, các định dạng trên và nhiều khả năng hơn có thể đạt được chỉ đơn giản bằng cách gọi:

sc.readFile(path) 

Đọc các tệp zip trong Spark

Thật không may, zip không có trong danh sách được hỗ trợ theo mặc định.

Tôi đã tìm thấy một bài viết tuyệt vời: Hadoop: Processing ZIP files in Map/Reduce và một số câu trả lời (example) giải thích cách sử dụng nhập ZipFileInputFormat cùng với sc.newAPIHadoopFile API. Nhưng điều này không làm việc cho tôi.

Giải pháp của tôi

Nếu không có bất kỳ phụ thuộc bên ngoài, bạn có thể tải tập tin của bạn với sc.binaryFiles và sau đó giải nén các PortableDataStream đọc nội dung. Đây là phương pháp tôi đã chọn.

import java.io.{BufferedReader, InputStreamReader} 
import java.util.zip.ZipInputStream 
import org.apache.spark.SparkContext 
import org.apache.spark.input.PortableDataStream 
import org.apache.spark.rdd.RDD 

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal { 

    def readFile(path: String, 
       minPartitions: Int = sc.defaultMinPartitions): RDD[String] = { 

     if (path.endsWith(".zip")) { 
     sc.binaryFiles(path, minPartitions) 
      .flatMap { case (name: String, content: PortableDataStream) => 
      val zis = new ZipInputStream(content.open) 
      // this solution works only for single file in the zip 
      val entry = zis.getNextEntry 
      val br = new BufferedReader(new InputStreamReader(zis)) 
      Stream.continually(br.readLine()).takeWhile(_ != null) 
      } 
     } else { 
     sc.textFile(path, minPartitions) 
     } 
    } 
    } 

sử dụng lớp tiềm ẩn này, bạn cần phải nhập nó và gọi phương thức readFile trên SparkContext:

import com.github.atais.spark.Implicits.ZipSparkContext 
sc.readFile(path) 

Và lớp ngầm sẽ được tải tập tin zip của bạn đúng cách và trở RDD[String] như nó được sử dụng để .

Lưu ý: Điều này chỉ hoạt động đối với một tệp trong kho lưu trữ zip!
Đối với nhiều tập tin trong hỗ trợ zip, hãy kiểm tra câu trả lời này: https://stackoverflow.com/a/45958458/1549135

+0

Bạn không đóng các kết nối. – Programmer

+0

Bất kỳ ý tưởng nào về cách tiếp cận với thư mục chứa nhiều tệp zip (phân vùng)? –

Các vấn đề liên quan