2014-04-30 28 views
12

Tôi đang cố gắng thêm các phần tử vào bản đồ trong khi lặp lại các phần tử của RDD. Tôi không nhận được bất kỳ lỗi nào, nhưng các sửa đổi không xảy ra.Sửa đổi bộ sưu tập bên trong Spark RDD foreach

Tất cả đều hoạt động tốt thêm trực tiếp hoặc lặp lại các bộ sưu tập khác:

scala> val myMap = new collection.mutable.HashMap[String,String] 
myMap: scala.collection.mutable.HashMap[String,String] = Map() 

scala> myMap("test1")="test1" 

scala> myMap 
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1) 

scala> List("test2", "test3").foreach(w => myMap(w) = w) 

scala> myMap 
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3) 

Nhưng khi tôi cố gắng làm điều tương tự từ một RDD:

scala> val fromFile = sc.textFile("tests.txt") 
... 
scala> fromFile.take(3) 
... 
res48: Array[String] = Array(test4, test5, test6) 

scala> fromFile.foreach(w => myMap(w) = w) 
scala> myMap 
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3) 

Tôi đã thử in các nội dung của bản đồ như trước khi foreach đảm bảo biến là như nhau và nó in chính xác:

fromFile.foreach(w => println(myMap("test1"))) 
... 
test1 
test1 
test1 
... 

Tôi cũng đã in phần tử đã sửa đổi của bản đồ bên trong mã foreach và nó in như đã sửa đổi, nhưng khi thao tác hoàn tất, bản đồ có vẻ chưa được sửa đổi.

scala> fromFile.foreach({w => myMap(w) = w; println(myMap(w))}) 
... 
test4 
test5 
test6 
... 
scala> myMap 
res55: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3) 

Chuyển đổi các RDD để một mảng (thu thập) cũng hoạt động tốt:

fromFile.collect.foreach(w => myMap(w) = w) 
scala> myMap 
res89: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test5 -> test5, test1 -> test1, test4 -> test4, test6 -> test6, test3 -> test3) 

Đây có phải là một vấn đề bối cảnh? Tôi có đang truy cập bản sao dữ liệu đang được sửa đổi ở một nơi khác không?

Trả lời

30

Nó trở nên rõ ràng hơn khi chạy trên cụm Spark (không phải một máy đơn lẻ). RDD hiện được trải rộng trên một số máy. Khi bạn gọi foreach, bạn yêu cầu mỗi máy phải làm gì với phần của RDD mà nó có. Nếu bạn tham khảo bất kỳ biến cục bộ nào (như myMap), chúng sẽ được sắp xếp theo thứ tự và được gửi tới máy, để chúng có thể sử dụng nó. Nhưng không có gì trở lại. Vì vậy, bản gốc của bạn myMap không bị ảnh hưởng.

Tôi nghĩ rằng điều này trả lời câu hỏi của bạn, nhưng rõ ràng là bạn đang cố gắng hoàn thành một cái gì đó và bạn sẽ không thể đạt được điều đó theo cách này. Cảm thấy tự do để giải thích ở đây hoặc trong một câu hỏi riêng biệt những gì bạn đang cố gắng làm, và tôi sẽ cố gắng giúp đỡ.

+0

Nó thực sự trả lời câu hỏi của tôi, và đừng lo lắng về những gì tôi đã cố gắng để thực hiện, tôi chỉ tìm thấy đây là một trường hợp thú vị tôi đã không có một lời giải thích cho. Tôi làm bây giờ, cảm ơn! – palako

+3

Có như Daniel chỉ ra bạn không thể biến đổi nhà nước, palako là loại thiếu điểm lập trình chức năng. Bạn không phải là nhà nước đột biến như sau đó bạn không thể parallerize. Bằng cách thiết kế mã theo cách mà bạn không biến đổi trạng thái, mã của bạn có thể parallerize miễn phí và bạn có thể sử dụng các khung như Spark và Scalding để phân phối trên một cụm. – samthebest

+0

@Daniel Có cách giải quyết nào cho vấn đề này không? –

Các vấn đề liên quan