Nói tóm lại:Spark nhiều ngữ cảnh
cụm EC2: 1 thạc sĩ 3 nô lệ phiên bản
Spark: 1.3.1
Tôi muốn sử dụng tùy chọn spark.driver.allowMultipleContexts, một ngữ cảnh cục bộ (chỉ cái chính) và một cụm (chủ và nô lệ).
tôi nhận được lỗi stacktrace này (dòng 29 là nơi mà tôi gọi là đối tượng mà khởi tạo sparkcontext giây):
fr.entry.Main.main(Main.scala)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1812)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1808)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1808)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1795)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:1795)
at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:1847)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:1754)
at fr.entry.cluster$.<init>(Main.scala:79)
at fr.entry.cluster$.<clinit>(Main.scala)
at fr.entry.Main$delayedInit$body.apply(Main.scala:29)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at fr.entry.Main$.main(Main.scala:14)
at fr.entry.Main.main(Main.scala)
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/2 is now LOADING
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/0 is now RUNNING
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/1 is now RUNNING
15/09/28 15:33:30 INFO SparkContext: Starting job: sum at Main.scala:29
15/09/28 15:33:30 INFO DAGScheduler: Got job 0 (sum at Main.scala:29) with 2 output partitions (allowLocal=false)
15/09/28 15:33:30 INFO DAGScheduler: Final stage: Stage 0(sum at Main.scala:29)
15/09/28 15:33:30 INFO DAGScheduler: Parents of final stage: List()
15/09/28 15:33:30 INFO DAGScheduler: Missing parents: List()
15/09/28 15:33:30 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29), which has no missing parents
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(2264) called with curMem=0, maxMem=55566516879
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.2 KB, free 51.8 GB)
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(1656) called with curMem=2264, maxMem=55566516879
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1656.0 B, free 51.8 GB)
15/09/28 15:33:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40476 (size: 1656.0 B, free: 51.8 GB)
15/09/28 15:33:30 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/09/28 15:33:30 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/2 is now RUNNING
15/09/28 15:33:30 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29)
15/09/28 15:33:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/09/28 15:33:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/09/28 15:34:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
Thông tin chi tiết:
Tôi muốn chạy một chương trình mà không hai điều. Thứ nhất, tôi có một địa chỉ sparkContext (chỉ trên master), tôi tạo một RDD và thực hiện một số thao tác. Thứ hai, tôi có một khởi tạo sparkContext thứ hai với một master và 3 slaves mà cũng tạo một RDD và thực hiện một số hoạt động. Vì vậy, trong trường hợp đầu tiên tôi muốn sử dụng 16 lõi của chủ và trường hợp thứ hai tôi muốn sử dụng 8cores x 3 của nô lệ.
Ví dụ đơn giản:
val arr = Array(Array(1, 2, 3, 4, 5, 6, 7, 8), Array(1, 2, 3, 4, 5, 6, 7, 8))
println(local.sparkContext.makeRDD(arr).count())
println(cluster.sparkContext.makeRDD(arr).map(l => l.sum).sum)
My hai SparkContexts:
object local {
val project = "test"
val version = "1.0"
val sc = new SparkConf()
.setMaster("local[16]")
.setAppName("Local")
.set("spark.local.dir", "/mnt")
.setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar"))
.setSparkHome("/root/spark")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.executor.memory", "45g")
val sparkContext = new SparkContext(sc)
}
object cluster {
val project = "test"
val version = "1.0"
val sc = new SparkConf()
.setMaster(masterURL) // ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com
.setAppName("Cluster")
.set("spark.local.dir", "/mnt")
.setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar") ++ otherJars)
.setSparkHome("/root/spark")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.executor.memory", "35g")
val sparkContext = new SparkContext(sc)
}
Làm thế nào tôi có thể sửa lỗi này?
Bạn có thể đưa ra lý do bạn muốn sử dụng hai bối cảnh không? Thường xuyên hơn không phải điều này là không cần thiết – Gillespie
@Gillespie, Hãy nói rằng tôi có 3 chương trình: prog1 và prog3 có thể chạy song song và prog2 phải được tuần tự. Prog1 của ouput là 15 RDDs (datasets). Prog2 là một thuật toán học máy mà tôi cần chạy 15 lần. Vì prog2 phải chạy trên 1 lõi cục bộ. Tôi đã thực hiện một hack nhỏ mà là để làm cho một RDD trong đó có 15 bộ dữ liệu thu thập được. Tôi ánh xạ trên RDD này và chạy prog2 trên mỗi bản ghi. Prog3 lấy 15 kết quả của prog2 và thực hiện một số thao tác song song. Tôi hy vọng điều này là rõ ràng? Tôi nghĩ trong trường hợp của tôi điều này là bắt buộc, nhưng nếu không, tôi cũng muốn biết câu trả lời của ví dụ nhỏ của tôi. – GermainGum