Spark hỗ trợ mặc định file nén
Theo Spark Programming Guide
All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").
này có thể được mở rộng bằng cách cung cấp thông tin về những gì các định dạng nén được hỗ trợ bởi Hadoop, mà về cơ bản có thể được kiểm tra bằng cách tìm tất cả các lớp học mở rộng CompressionCodec
(docs)
name | ext | codec class
-------------------------------------------------------------
bzip2 | .bz2 | org.apache.hadoop.io.compress.BZip2Codec
default | .deflate | org.apache.hadoop.io.compress.DefaultCodec
deflate | .deflate | org.apache.hadoop.io.compress.DeflateCodec
gzip | .gz | org.apache.hadoop.io.compress.GzipCodec
lz4 | .lz4 | org.apache.hadoop.io.compress.Lz4Codec
snappy | .snappy | org.apache.hadoop.io.compress.SnappyCodec
Nguồn: List the available hadoop codecs
Vì vậy, các định dạng trên và nhiều khả năng hơn có thể đạt được chỉ đơn giản bằng cách gọi:
sc.readFile(path)
Đọc các tệp zip trong Spark
Thật không may, zip
không có trong danh sách được hỗ trợ theo mặc định.
Tôi đã tìm thấy một bài viết tuyệt vời: Hadoop: Processing ZIP files in Map/Reduce và một số câu trả lời (example) giải thích cách sử dụng nhập ZipFileInputFormat
cùng với sc.newAPIHadoopFile
API. Nhưng điều này không làm việc cho tôi.
Giải pháp của tôi
Nếu không có bất kỳ phụ thuộc bên ngoài, bạn có thể tải tập tin của bạn với sc.binaryFiles
và sau đó giải nén các PortableDataStream
đọc nội dung. Đây là phương pháp tôi đã chọn.
import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {
def readFile(path: String,
minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {
if (path.endsWith(".zip")) {
sc.binaryFiles(path, minPartitions)
.flatMap { case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
// this solution works only for single file in the zip
val entry = zis.getNextEntry
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
} else {
sc.textFile(path, minPartitions)
}
}
}
sử dụng lớp tiềm ẩn này, bạn cần phải nhập nó và gọi phương thức readFile
trên SparkContext
:
import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)
Và lớp ngầm sẽ được tải tập tin zip
của bạn đúng cách và trở RDD[String]
như nó được sử dụng để .
Lưu ý: Điều này chỉ hoạt động đối với một tệp trong kho lưu trữ zip!
Đối với nhiều tập tin trong hỗ trợ zip, hãy kiểm tra câu trả lời này: https://stackoverflow.com/a/45958458/1549135
Tôi có một vấn đề tương tự. Tôi đã thử điều này nhưng nhận được lỗi. bạn có thể giúp gì không? val zipFileRDD = sc.binaryFiles (zipFile) .flatMap {trường hợp (tên: Chuỗi, nội dung: PortableDataStream) => new ZipInputStream (nội dung.mở)} > >: 95: lỗi: loại không phù hợp; > được tìm thấy: java.util.zip.ZipInputStream > bắt buộc: TraversableOnce [?] > val zipFileRDD = sc.binaryFiles (zipFile) .flatMap {case (name, content) => new ZipInputStream (content.open)} –
Pooja3101