2016-04-06 15 views
7

Tôi đã viết một phương pháp phải xem xét một số ngẫu nhiên để mô phỏng phân phối Bernoulli. Tôi đang sử dụng random.nextDouble để tạo số từ 0 đến 1 sau đó đưa ra quyết định dựa trên giá trị đó cho tham số xác suất của tôi.Spark - Số ngẫu nhiên Thế hệ

Vấn đề của tôi là Spark đang tạo ra các số ngẫu nhiên giống nhau trong mỗi lần lặp của hàm lập bản đồ vòng lặp của tôi. Tôi đang sử dụng API DataFrame. Mã của tôi sau định dạng này:

val myClass = new MyClass() 
val M = 3 
val myAppSeed = 91234 
val rand = new scala.util.Random(myAppSeed) 

for (m <- 1 to M) { 
    val newDF = sqlContext.createDataFrame(myDF 
    .map{row => RowFactory 
     .create(row.getString(0), 
     myClass.myMethod(row.getString(2), rand.nextDouble()) 
    }, myDF.schema) 
} 

Đây là lớp:

class myClass extends Serializable { 
    val q = qProb 

    def myMethod(s: String, rand: Double) = { 
    if (rand <= q) // do something 
    else // do something else 
    } 
} 

Tôi cần một số ngẫu nhiên mới mỗi khi myMethod được gọi. Tôi cũng đã cố gắng tạo ra các số bên phương pháp của tôi với java.util.Random (scala.util.Random v10 không mở rộng Serializable) như dưới đây, nhưng tôi vẫn nhận được những con số giống nhau trong mỗi vòng lặp for

val r = new java.util.Random(s.hashCode.toLong) 
val rand = r.nextDouble() 

tôi đã thực hiện một số nghiên cứu, và dường như điều này có liên quan đến tính chất xác định của Sparks.

Trả lời

2

Lý do tại sao cùng một chuỗi được lặp đi lặp lại là các máy phát điện ngẫu nhiên được tạo ra và khởi tạo với một hạt giống trước khi dữ liệu được phân chia. Mỗi phân vùng sau đó bắt đầu từ cùng một hạt giống ngẫu nhiên. Có lẽ không phải là cách hiệu quả nhất để làm điều đó, nhưng những điều sau đây nên làm việc:

val myClass = new MyClass() 
val M = 3 

for (m <- 1 to M) { 
    val newDF = sqlContext.createDataFrame(myDF 
    .map{ 
     val rand = scala.util.Random 
     row => RowFactory 
     .create(row.getString(0), 
     myClass.myMethod(row.getString(2), rand.nextDouble()) 
    }, myDF.schema) 
} 
+0

của tôi sửa đổi này hơi để giải quyết vấn đề của tôi . Tôi đã chuyển val ngẫu nhiên vào phương thức của tôi và tạo ra các số ngẫu nhiên từ trong đó. Điều này giải quyết vấn đề của tôi, nhưng tôi đã phải sử dụng 'java.util.Random' cho lý do serializeability. –

4

Chỉ cần sử dụng chức năng SQL rand:

import org.apache.spark.sql.functions._ 

//df: org.apache.spark.sql.DataFrame = [key: int] 

df.select($"key", rand() as "rand").show 
+---+-------------------+ 
|key|    rand| 
+---+-------------------+ 
| 1| 0.8635073400704648| 
| 2| 0.6870153659986652| 
| 3|0.18998048357873532| 
+---+-------------------+ 


df.select($"key", rand() as "rand").show 
+---+------------------+ 
|key|    rand| 
+---+------------------+ 
| 1|0.3422484248879837| 
| 2|0.2301384925817671| 
| 3|0.6959421970071372| 
+---+------------------+ 
+0

này không giải quyết vấn đề khá của tôi, nhưng một giải pháp thanh lịch mà tôi có thể sẽ được sử dụng trong tương lai, vì vậy 1 –

2

Theo this post, giải pháp tốt nhất không phải là để đặt new scala.util.Random bên trong bản đồ, cũng không phải hoàn toàn bên ngoài (ví dụ trong mã trình điều khiển.), nhưng trong một trung gian mapPartitionsWithIndex:

import scala.util.Random 
val myAppSeed = 91234 
val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) => 
    val rand = new scala.util.Random(indx+myAppSeed) 
    iter.map(x => (x, Array.fill(10)(rand.nextDouble))) 
}