2015-09-28 22 views
8

Nói tóm lại:Spark nhiều ngữ cảnh

cụm EC2: 1 thạc sĩ 3 nô lệ phiên bản

Spark: 1.3.1

Tôi muốn sử dụng tùy chọn spark.driver.allowMultipleContexts, một ngữ cảnh cục bộ (chỉ cái chính) và một cụm (chủ và nô lệ).

tôi nhận được lỗi stacktrace này (dòng 29 là nơi mà tôi gọi là đối tượng mà khởi tạo sparkcontext giây):

fr.entry.Main.main(Main.scala) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1812) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1808) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1808) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1795) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:1795) 
    at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:1847) 
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:1754) 
    at fr.entry.cluster$.<init>(Main.scala:79) 
    at fr.entry.cluster$.<clinit>(Main.scala) 
    at fr.entry.Main$delayedInit$body.apply(Main.scala:29) 
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40) 
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
    at scala.App$$anonfun$main$1.apply(App.scala:71) 
    at scala.App$$anonfun$main$1.apply(App.scala:71) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) 
    at scala.App$class.main(App.scala:71) 
    at fr.entry.Main$.main(Main.scala:14) 
    at fr.entry.Main.main(Main.scala) 
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/2 is now LOADING 
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/0 is now RUNNING 
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/1 is now RUNNING 
15/09/28 15:33:30 INFO SparkContext: Starting job: sum at Main.scala:29 
15/09/28 15:33:30 INFO DAGScheduler: Got job 0 (sum at Main.scala:29) with 2 output partitions (allowLocal=false) 
15/09/28 15:33:30 INFO DAGScheduler: Final stage: Stage 0(sum at Main.scala:29) 
15/09/28 15:33:30 INFO DAGScheduler: Parents of final stage: List() 
15/09/28 15:33:30 INFO DAGScheduler: Missing parents: List() 
15/09/28 15:33:30 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29), which has no missing parents 
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(2264) called with curMem=0, maxMem=55566516879 
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.2 KB, free 51.8 GB) 
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(1656) called with curMem=2264, maxMem=55566516879 
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1656.0 B, free 51.8 GB) 
15/09/28 15:33:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40476 (size: 1656.0 B, free: 51.8 GB) 
15/09/28 15:33:30 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 
15/09/28 15:33:30 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839 
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/2 is now RUNNING 
15/09/28 15:33:30 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29) 
15/09/28 15:33:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 
15/09/28 15:33:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 
15/09/28 15:34:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 

Thông tin chi tiết:

Tôi muốn chạy một chương trình mà không hai điều. Thứ nhất, tôi có một địa chỉ sparkContext (chỉ trên master), tôi tạo một RDD và thực hiện một số thao tác. Thứ hai, tôi có một khởi tạo sparkContext thứ hai với một master và 3 slaves mà cũng tạo một RDD và thực hiện một số hoạt động. Vì vậy, trong trường hợp đầu tiên tôi muốn sử dụng 16 lõi của chủ và trường hợp thứ hai tôi muốn sử dụng 8cores x 3 của nô lệ.

Ví dụ đơn giản:

val arr = Array(Array(1, 2, 3, 4, 5, 6, 7, 8), Array(1, 2, 3, 4, 5, 6, 7, 8)) 
println(local.sparkContext.makeRDD(arr).count()) 
println(cluster.sparkContext.makeRDD(arr).map(l => l.sum).sum) 

My hai SparkContexts:

object local { 

    val project = "test" 
    val version = "1.0" 

    val sc = new SparkConf() 
    .setMaster("local[16]") 
    .setAppName("Local") 
    .set("spark.local.dir", "/mnt") 
    .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar")) 
    .setSparkHome("/root/spark") 
    .set("spark.driver.allowMultipleContexts", "true") 
    .set("spark.executor.memory", "45g") 

    val sparkContext = new SparkContext(sc) 
} 

object cluster { 

    val project = "test" 
    val version = "1.0" 

    val sc = new SparkConf() 
    .setMaster(masterURL) // ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com 
    .setAppName("Cluster") 
    .set("spark.local.dir", "/mnt") 
    .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar") ++ otherJars) 
    .setSparkHome("/root/spark") 
    .set("spark.driver.allowMultipleContexts", "true") 
    .set("spark.executor.memory", "35g") 

    val sparkContext = new SparkContext(sc) 
} 

Làm thế nào tôi có thể sửa lỗi này?

+1

Bạn có thể đưa ra lý do bạn muốn sử dụng hai bối cảnh không? Thường xuyên hơn không phải điều này là không cần thiết – Gillespie

+0

@Gillespie, Hãy nói rằng tôi có 3 chương trình: prog1 và prog3 có thể chạy song song và prog2 phải được tuần tự. Prog1 của ouput là 15 RDDs (datasets). Prog2 là một thuật toán học máy mà tôi cần chạy 15 lần. Vì prog2 phải chạy trên 1 lõi cục bộ. Tôi đã thực hiện một hack nhỏ mà là để làm cho một RDD trong đó có 15 bộ dữ liệu thu thập được. Tôi ánh xạ trên RDD này và chạy prog2 trên mỗi bản ghi. Prog3 lấy 15 kết quả của prog2 và thực hiện một số thao tác song song. Tôi hy vọng điều này là rõ ràng? Tôi nghĩ trong trường hợp của tôi điều này là bắt buộc, nhưng nếu không, tôi cũng muốn biết câu trả lời của ví dụ nhỏ của tôi. – GermainGum

Trả lời

10

Mặc dù tùy chọn cấu hình spark.driver.allowMultipleContext tồn tại, nó gây hiểu nhầm vì việc sử dụng nhiều ngữ cảnh Spark không được khuyến khích. Tùy chọn này chỉ được sử dụng cho các thử nghiệm nội bộ Spark và không được sử dụng trong các chương trình người dùng. Bạn có thể nhận được kết quả không mong muốn trong khi chạy nhiều hơn một bối cảnh Spark trong một JVM đơn lẻ.

+0

có khuyến khích tài liệu này ở bất kỳ đâu không? Tôi muốn nó là đúng rằng 2 là nản lòng, nhưng tôi muốn thấy rằng một nơi nào đó chính thức nếu có thể – Kristian

+0

Giới hạn này được cho là đã nâng lên trong tia lửa 2.0 Tôi đang nhìn vào nó. – javadba

1

Nếu phối hợp được yêu cầu giữa 2 chương trình, thì tốt hơn là nên làm cho nó trở thành một phần của ứng dụng Spark đơn lẻ để tận dụng tối ưu hóa nội bộ của Sparks và tránh các i/o không cần thiết.

Thứ hai, nếu 2 ứng dụng không cần phối hợp theo bất kỳ cách nào, bạn có thể khởi chạy 2 ứng dụng riêng biệt. Vì bạn đang sử dụng Amazon EC2/EMR, bạn có thể sử dụng YARN làm người quản lý tài nguyên của bạn mà không cần đầu tư thời gian đáng kể như được mô tả here.

1

Nếu bạn cần phải làm việc với nhiều bối cảnh Spark, bạn có thể bật tùy chọn đặc biệt [MultipleContexts] (1), nhưng chỉ được sử dụng cho các kiểm tra nội bộ Spark và không được sử dụng trong các chương trình người dùng. Bạn sẽ nhận được hành vi không mong muốn khi chạy nhiều hơn một bối cảnh Spark trong một JVM duy nhất [SPARK-2243] (2). Tuy nhiên, có thể tạo các bối cảnh khác nhau trong các JVM riêng biệt và quản lý các bối cảnh ở cấp SparkConf, điều này sẽ tối ưu phù hợp với các công việc thực thi.

Nó trông như thế này: Mist creates every new Sparkcontext in its own JVM.

Có một middleware trên đầu trang của Spark - [Mist]. Nó quản lý các bối cảnh Spark và nhiều JVM, vì vậy bạn có thể có các công việc khác nhau như đường ống ETL, công việc dự báo nhanh, truy vấn Hive quảng cáo và ứng dụng phát trực tiếp chạy song song trên cùng một cụm.

1> github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/SparkContextSuite.scala#L67

2> issues.apache.org/jira/duyệt/SPARK-2243

0

Java:

.set("spark.driver.allowMultipleContexts", "true") 

+

sparkContext.cancelAllJobs(); 
sparkContext.stop(); 

Nó làm việc cho tôi.

Các vấn đề liên quan