Hãy bắt đầu với một chức năng đơn giản mà luôn luôn trả về một số nguyên ngẫu nhiên:Số ngẫu nhiên thế hệ trong PySpark
import numpy as np
def f(x):
return np.random.randint(1000)
và RDD điền với số không và ánh xạ sử dụng f
:
rdd = sc.parallelize([0] * 10).map(f)
Từ trên RDD là không tiếp tục tồn tại Tôi hy vọng tôi sẽ nhận được một đầu ra khác nhau mỗi khi tôi thu thập:
> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]
Nếu chúng ta bỏ qua thực tế rằng việc phân phối các giá trị không thực sự trông ngẫu nhiên thì nó ít nhiều xảy ra. Vấn đề bắt đầu chúng ta khi chỉ mất một yếu tố đầu tiên:
assert len(set(rdd.first() for _ in xrange(100))) == 1
hoặc
assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1
Dường như để trả lại số giống nhau mỗi lần. Tôi đã có thể tái tạo hành vi này trên hai máy khác nhau với Spark 1.2, 1.3 và 1.4. Ở đây tôi đang sử dụng np.random.randint
nhưng nó hoạt động theo cùng một cách với random.randint
.
Vấn đề này, tương tự như kết quả không chính xác-ngẫu nhiên với collect
, có vẻ là Python cụ thể và tôi không thể tái tạo nó bằng Scala:
def f(x: Int) = scala.util.Random.nextInt(1000)
val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size
rdd.collect()
đã làm tôi nhớ một cái gì đó rõ ràng ở đây?
Sửa:
Hóa ra nguồn gốc của vấn đề là Python RNG thực hiện. Để báo giá official documentation:
Các chức năng được cung cấp bởi mô-đun này thực sự là phương pháp ràng buộc của một thể hiện ẩn của lớp ngẫu nhiên.Random. Bạn có thể khởi tạo các trường hợp ngẫu nhiên của riêng mình để nhận các trình tạo không chia sẻ trạng thái.
tôi giả NumPy hoạt động theo cách tương tự và viết lại f
sử dụng RandomState
dụ như sau
import os
import binascii
def f(x, seed=None):
seed = (
seed if seed is not None
else int(binascii.hexlify(os.urandom(4)), 16))
rs = np.random.RandomState(seed)
return rs.randint(1000)
làm cho nó chậm hơn nhưng giải quyết vấn đề.
Mặc dù ở trên không giải thích được kết quả ngẫu nhiên từ việc thu thập nhưng tôi vẫn không hiểu nó ảnh hưởng như thế nào đến first
/take(1)
giữa nhiều tác vụ.
Chỉ cần làm rõ: nếu tôi đang sử dụng chức năng ngẫu nhiên của Numpy trong Spark, Luôn chọn kết quả tương tự trong mỗi phân vùng? Tôi có thể sử dụng np.random.choice như thế nào? – member555
_Nó luôn luôn chọn cùng một kết quả trong mỗi phân vùng_ - không chính xác, nhưng các giá trị được tính trên một công nhân đơn lẻ sẽ không độc lập. _Làm thế nào tôi có thể sử dụng np.random.choice vì vậy nó sẽ là ngẫu nhiên? _ - Tôi đã mô tả giải pháp trong một chỉnh sửa. Bạn nên sử dụng một trạng thái riêng biệt. Vì nó là khá tốn kém, bạn có thể muốn khởi tạo nó một lần cho mỗi phân vùng. – zero323
Bạn có thể giải thích cho tôi chi tiết hơn về vấn đề này không? tại sao trạng thái chia sẻ của python là một vấn đề? – member555