2017-07-20 13 views
8

Ví dụ: Tôi cần để có được một danh sách của tất cả các nhà điều hành có sẵn và khả năng đa luồng tương ứng của họ (KHÔNG phải tổng công suất đa luồng, sc.defaultParallelism đã xử lý).Phương pháp để lấy số lõi cho một người thực hiện trên một nút tác vụ?

Do tham số này phụ thuộc vào triển khai thực hiện (YARN và độc lập tia lửa có chiến lược khác nhau để phân bổ lõi) và tình huống (có thể dao động do phân bổ động và chạy công việc dài hạn). Tôi không thể sử dụng phương pháp khác để ước tính điều này. Có cách nào để lấy thông tin này bằng cách sử dụng API Spark trong một chuyển đổi được phân phối không? (Ví dụ TaskContext, SparkEnv)

CẬP NHẬT Đối với Spark 1.6, tôi đã thử các phương pháp sau đây:

1) chạy một công việc 1 sân khấu với nhiều phân vùng (>> defaultParallelism) và đếm số threadIDs biệt cho mỗi executorID:

val n = sc.defaultParallelism * 16 
sc.parallelize(n, n).map(v => SparkEnv.get.executorID -> Thread.currentThread().getID) 
.groupByKey() 
.mapValue(_.distinct) 
.collect() 

Tuy nhiên điều này dẫn đến một ước tính cao hơn so với khả năng xử lý đa luồng thực tế bởi vì mỗi người thi hành Spark sử dụng một hồ bơi thread overprovisioned.

2) Tương tự như 1, ngoại trừ n = defaultParallesim, và trong mọi tác vụ, tôi thêm thời gian trễ để ngăn người đàm phán tài nguyên mất cân bằng (nút nhanh hoàn thành nhiệm vụ của nó và yêu cầu nhiều hơn trước khi các nút chậm có thể bắt đầu chạy):

val n = sc.defaultParallelism 
sc.parallelize(n, n).map{ 
    v => 
    Thread.sleep(5000) 
    SparkEnv.get.executorID -> Thread.currentThread().getID 
} 
.groupByKey() 
.mapValue(_.distinct) 
.collect() 

nó hoạt động phần lớn thời gian, nhưng chậm hơn nhiều so với mức cần thiết và có thể bị hỏng do cụm mất cân bằng hoặc công việc đầu cơ.

3) Tôi chưa thử: sử dụng phản chiếu java để đọc BlockManager.numUsableCores, đây rõ ràng không phải là giải pháp ổn định, việc triển khai nội bộ có thể thay đổi bất kỳ lúc nào.

Hãy cho tôi biết nếu bạn đã tìm thấy điều gì đó tốt hơn.

+0

Cảm ơn bạn Paul, đây là cho scala, tôi đăng nó vào ban đêm vì vậy đã không viết xuống điều tra của tôi, sẽ thêm lên sau này – tribbloid

+1

@Paul cập nhật, là nó đủ tốt? – tribbloid

+0

Có vẻ tốt hơn rất nhiều. – Paul

Trả lời

2

Rất dễ dàng với API nghỉ ngơi Spark. Bạn có để có được id ứng dụng:

val applicationId = spark.sparkContext.applicationId 

ui URL:

val baseUrl = spark.sparkContext.uiWebUrl 

và truy vấn:

val url = baseUrl.map { url => 
    s"${url}/api/v1/applications/${applicationId}/executors" 
} 

Với Apache HTTP thư viện (đã có trong phụ thuộc Spark, chuyển thể từ https://alvinalexander.com/scala/scala-rest-client-apache-httpclient-restful-clients):

import org.apache.http.impl.client.DefaultHttpClient 
import org.apache.http.client.methods.HttpGet 
import scala.util.Try 

val client = new DefaultHttpClient() 

val response = url 
    .flatMap(url => Try{client.execute(new HttpGet(url))}.toOption) 
    .flatMap(response => Try{ 
    val s = response.getEntity().getContent() 
    val json = scala.io.Source.fromInputStream(s).getLines.mkString 
    s.close 
    json 
    }.toOption) 

và json4s:

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 
implicit val formats = DefaultFormats 

case class ExecutorInfo(hostPort: String, totalCores: Int) 

val executors: Option[List[ExecutorInfo]] = response.flatMap(json => Try { 
    parse(json).extract[List[ExecutorInfo]] 
}.toOption) 

Chừng nào bạn giữ id ứng dụng và URL ui trong tầm tay và cổng ui mở các kết nối bên ngoài, bạn có thể làm điều tương tự từ bất kỳ nhiệm vụ.

+0

Cảm ơn rất nhiều vì câu trả lời! Chúng ta hãy chờ một vài tuần, tôi cảm thấy điều này có thể trở thành một mô hình chống nếu không được sử dụng một cách cẩn thận, Một chủ nhân có thể quản lý hàng ngàn nút và giao diện người dùng của nó không được thiết kế để DDoS được tất cả chúng thông qua một dữ liệu không hiệu quả giao thức tuần tự hóa. – tribbloid

2

Tôi sẽ cố gắng triển khai SparkListener theo cách tương tự như giao diện người dùng web. This code có thể hữu ích làm ví dụ.

+0

Ý tưởng hay! Trong Spark 1.6 đây là nơi duy nhất ExecutorInfo có thể đọc được, vì vậy có lẽ nó đáng để thử. Nhược điểm duy nhất là người nghe chỉ được kích hoạt trên trình điều khiển, vì vậy việc thực thi của nó không phải là cục bộ. – tribbloid

Các vấn đề liên quan