Trước hết hãy để tôi chỉ ra rằng tôi khá mới mẻ với cả Spark và Scala. Tôi đang cố gắng điều tra hiệu suất Spark đã hứa bằng cách cố gắng di chuyển một trong những công việc Hadoop Map/Reduce mà tôi đã làm trong quá khứ. Công việc này mất 14 phút trên Hadoop bằng cách sử dụng 3x r3.2xlớn máy cho một đầu vào của 16 tập tin nén bzip 170mb mỗi. Tôi dịch nó để Scala/Spark tốt nhất mà tôi có thể vào một cái gì đó như thế này:Nút thắt cổ chai hiệu suất ở đâu trong mã Spark/Scala này?
val conceptData = spark.textFile(inputPath)
val result = conceptData.repartition(60).cache()
.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)})
.flatMap(metrics => metrics._2.map(t => (t._1,(1,List((metrics._1,t._2.head))))))
.reduceByKey((a,b) => combine(a,b))
.map(t => t._1 + "\t" + t._2._1 + "\t" + print(t._2._2))
result.saveAsTextFile(outputPath)
def print(tuples: List[(String, Any)]): String =
{
tuples.map(l => l._1 + "\u200e" + l._2).reduce(_ + "\u200f" + _)
}
def combine(a: (Int, List[(String, Any)]), b: (Int, List[(String, Any)])): (Int, List[(String, Any)]) =
{
(a._1 + b._1,a._2 ++ b._2)
}
object JsonUtil {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
mapper.readValue[T](json)
}
}
tôi đã sử dụng lệnh phân vùng lại ngay từ đầu để thiết lập phân vùng đến 60 kể từ khi tôi đọc ở đâu đó là tốt để có 2-3 phân vùng trên mỗi lõi. Tôi đang chạy việc Spark này trên cùng một máy 3x r3.2xlarge (mỗi có 8 lõi và 58g sẵn) vì vậy tôi nộp công việc của tôi theo cách sau đây:
spark/bin/spark-submit --executor-memory 58G --total-executor-cores 24 (... other arguments ...)
Và phải mất hơn 1 giờ để chạy thông qua cùng một đầu vào ... Tôi không chắc chắn nếu vấn đề là trong cấu hình Scala hoặc Spark để giúp đỡ bất kỳ được chào đón.
Trân trọng, Augusto
EDIT 1: lần trung bình đối với một số hoạt động:
Đọc các tập tin từ S3: ~ 2minutes
flatMap: ~ 11 phút
reduceByKey :> 1 giờ
Các phím được sử dụng là đường dẫn S3 họ có thể nhận được khá lâu, không biết nếu điều đó tạo nên sự khác biệt.
EDIT 2: tôi thay các reduceByKey
chức năng với .reduceByKey((a,b) => a)
và công việc kết thúc dưới 10min vì vậy phải có một cái gì đó thực sự xảy ra với combine
chức năng
vài câu hỏi. 1. Bạn có thể nói phiên bản Spark bạn đang sử dụng không? 2. Tôi giả sử bạn đang chạy ở chế độ Standalone. 3. Bạn đã xem giao diện người dùng Spark chưa. Nó nói gì ? Cuối cùng, bạn đã thử chạy từng hoạt động này từ Scala REPL chưa? Bạn có thể làm điều đó bằng cách chia nhỏ dòng đầu tiên của mình. Spark nhanh hơn nhiều so với điều này. Xem nhanh mã của bạn cho biết bạn chỉ đang phân tích cú pháp dữ liệu 3G và chạy MR trên đó. Nếu bạn đang sử dụng Spark 1.1+ bạn có thể tải 'conceptData' dưới dạng bảng SparkSQL và sau đó đọc giá trị ở đó và xem mất bao lâu. –
1) Spark 1.2 2) Standalone 3) Giao diện người dùng chỉ ra rằng cho đến khi dòng flatMap, thực hiện mất khoảng 7 phút, nó đã được sau khi hầu hết thời gian đã được chi tiêu. – Augusto
Làm thế nào về chỉ làm như sau trên REPL. 'val conceptData = spark.textFile (inputPath)' theo sau là 'conceptData.count' và xem mất bao lâu. Bạn nên sử dụng các thiết lập tương tự từ Scala REPL của bạn (tức là, spark/bin/spark-shell - bộ nhớ công cộng 58G ..) Tôi cũng giả định rằng bạn đang tải từ HDFS cục bộ (trên EC2). –