Giả sử Foo
là một lớp trường hợp như thế này:
import java.sql.Date
case class Foo(name: String, createDate: java.sql.Date)
Sử dụng RDDs đồng bằng:
import org.apache.spark.rdd.RDD
import scala.math.Ordering
val rdd: RDD[Foo] = sc
.parallelize(Seq(
("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"),
("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23")))
.toDF("name", "createDate")
.withColumn("createDate", $"createDate".cast("date"))
.as[Foo].rdd
rdd.cache()
val n = scala.math.ceil(0.1 * rdd.count).toInt
dữ liệu phù hợp với bộ nhớ điều khiển:
và phần bạn muốn là tương đối nhỏ
rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime))
// Array[Foo] = Array(Foo(a,2009-11-23))
phần bạn muốn là tương đối lớn:
rdd.sortBy(_.createDate.getTime).take(n)
khác
rdd
.sortBy(_.createDate.getTime)
.zipWithIndex
.filter{case (_, idx) => idx < n}
.keys
Sử dụng DataFrame (lưu ý - đây thực sự không phải là hiệu suất tối ưu khôn ngoan do hành vi giới hạn).
import org.apache.spark.sql.Row
val topN = rdd.toDF.orderBy($"createDate").limit(n)
topN.show
// +----+----------+
// |name|createDate|
// +----+----------+
// | a|2009-11-23|
// +----+----------+
// Optionally recreate RDD[Foo]
topN.map{case Row(name: String, date: Date) => Foo(name, date)}
Hi zero323 bạn có thể nói thực sự nhanh chóng tại sao hiệu suất DataFrame là tối ưu về hoạt động giới hạn? sự khác biệt nào so sánh với đầu trên RDD trong triển khai khôn ngoan? @ zero333 –
@XinweiLiu Tôi đã cung cấp câu trả lời cho câu hỏi của bạn. Tôi hy vọng nó giải thích những gì đang xảy ra. – zero323
Câu trả lời hay nhất @ zero323. Nhưng tôi vẫn có cùng một câu hỏi xinwei Liu có. Tại sao df.limit() chậm? – guilhermecgs