2015-01-03 15 views
7

Trước hết hãy để tôi chỉ ra rằng tôi khá mới mẻ với cả Spark và Scala. Tôi đang cố gắng điều tra hiệu suất Spark đã hứa bằng cách cố gắng di chuyển một trong những công việc Hadoop Map/Reduce mà tôi đã làm trong quá khứ. Công việc này mất 14 phút trên Hadoop bằng cách sử dụng 3x r3.2xlớn máy cho một đầu vào của 16 tập tin nén bzip 170mb mỗi. Tôi dịch nó để Scala/Spark tốt nhất mà tôi có thể vào một cái gì đó như thế này:Nút thắt cổ chai hiệu suất ở đâu trong mã Spark/Scala này?

val conceptData = spark.textFile(inputPath) 
val result = conceptData.repartition(60).cache() 
    .map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)}) 
    .flatMap(metrics => metrics._2.map(t => (t._1,(1,List((metrics._1,t._2.head)))))) 
    .reduceByKey((a,b) => combine(a,b)) 
    .map(t => t._1 + "\t" + t._2._1 + "\t" + print(t._2._2)) 
result.saveAsTextFile(outputPath) 

def print(tuples: List[(String, Any)]): String = 
{ 
    tuples.map(l => l._1 + "\u200e" + l._2).reduce(_ + "\u200f" + _) 
} 

def combine(a: (Int, List[(String, Any)]), b: (Int, List[(String, Any)])): (Int, List[(String, Any)]) = 
{ 
    (a._1 + b._1,a._2 ++ b._2) 
} 

object JsonUtil { 
    val mapper = new ObjectMapper() with ScalaObjectMapper 
    mapper.registerModule(DefaultScalaModule) 
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) 

    def fromJson[T](json: String)(implicit m : Manifest[T]): T = { 
    mapper.readValue[T](json) 
    } 
} 

tôi đã sử dụng lệnh phân vùng lại ngay từ đầu để thiết lập phân vùng đến 60 kể từ khi tôi đọc ở đâu đó là tốt để có 2-3 phân vùng trên mỗi lõi. Tôi đang chạy việc Spark này trên cùng một máy 3x r3.2xlarge (mỗi có 8 lõi và 58g sẵn) vì vậy tôi nộp công việc của tôi theo cách sau đây:

spark/bin/spark-submit --executor-memory 58G --total-executor-cores 24 (... other arguments ...) 

Và phải mất hơn 1 giờ để chạy thông qua cùng một đầu vào ... Tôi không chắc chắn nếu vấn đề là trong cấu hình Scala hoặc Spark để giúp đỡ bất kỳ được chào đón.

Trân trọng, Augusto

EDIT 1: lần trung bình đối với một số hoạt động:

Đọc các tập tin từ S3: ~ 2minutes

flatMap: ~ 11 phút

reduceByKey :> 1 giờ

Các phím được sử dụng là đường dẫn S3 họ có thể nhận được khá lâu, không biết nếu điều đó tạo nên sự khác biệt.

EDIT 2: tôi thay các reduceByKey chức năng với .reduceByKey((a,b) => a) và công việc kết thúc dưới 10min vì vậy phải có một cái gì đó thực sự xảy ra với combine chức năng

+0

vài câu hỏi. 1. Bạn có thể nói phiên bản Spark bạn đang sử dụng không? 2. Tôi giả sử bạn đang chạy ở chế độ Standalone. 3. Bạn đã xem giao diện người dùng Spark chưa. Nó nói gì ? Cuối cùng, bạn đã thử chạy từng hoạt động này từ Scala REPL chưa? Bạn có thể làm điều đó bằng cách chia nhỏ dòng đầu tiên của mình. Spark nhanh hơn nhiều so với điều này. Xem nhanh mã của bạn cho biết bạn chỉ đang phân tích cú pháp dữ liệu 3G và chạy MR trên đó. Nếu bạn đang sử dụng Spark 1.1+ bạn có thể tải 'conceptData' dưới dạng bảng SparkSQL và sau đó đọc giá trị ở đó và xem mất bao lâu. –

+0

1) Spark 1.2 2) Standalone 3) Giao diện người dùng chỉ ra rằng cho đến khi dòng flatMap, thực hiện mất khoảng 7 phút, nó đã được sau khi hầu hết thời gian đã được chi tiêu. – Augusto

+0

Làm thế nào về chỉ làm như sau trên REPL. 'val conceptData = spark.textFile (inputPath)' theo sau là 'conceptData.count' và xem mất bao lâu. Bạn nên sử dụng các thiết lập tương tự từ Scala REPL của bạn (tức là, spark/bin/spark-shell - bộ nhớ công cộng 58G ..) Tôi cũng giả định rằng bạn đang tải từ HDFS cục bộ (trên EC2). –

Trả lời

0

này đi xuống noobish kỹ năng lập trình Scala của tôi - nó chỉ mất 15 phút khi thay đổi như sau performant hơn Scala:

val conceptData = spark.textFile(inputPath).repartition(24) 

val result = conceptData.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)}) 
    .flatMap(metrics => metrics._2.map(t => (t._1,(1, List(metrics._1+"\u200e"+ t._2.head))))) 
    .reduceByKey((a,b) => (a._1 + b._1, a._2:::b._2)) 
    .map(t=> t._1 + "\t" + t._2._1 + "\t" + t._2._2.mkString("\u200f")) 

Nó có thể có lẽ vẫn còn được cải thiện hơn nữa. Dù sao, cảm ơn sự giúp đỡ của mọi người.

Trân trọng,

Augusto

0

Dựa trên thực tế là hầu hết thời gian được chi tiêu sau khi flatMap, tôi sẽ nghi ngờ rằng shuffle là những gì làm chậm bạn xuống và không sử dụng CPU. Bạn có thể muốn thử chạy công việc với ít phân vùng hơn. Một điều bạn có thể thử là thay thế reduceByKey() bằng foldByKey() là kết hợp nhưng không giao hoán có nghĩa là nó phải giữ thứ tự RDD khi chạy kết hợp và điều này có thể chuyển thành lưu lượng truy cập mạng ít hơn trong khi phát ngẫu nhiên.

+0

Tôi đã thử dùng foldByKey và công việc chết nếu tôi sử dụng nó. Tôi đã thử nó hai lần và nó đã xảy ra tại cùng một thời điểm trong thời gian (sau 27 phút (- thông báo lỗi duy nhất tôi có thể tìm thấy là: 'executor.CoarseGrainedExecutorBackend: Driver Disassociated' – Augusto

Các vấn đề liên quan