2014-11-16 15 views
6

Tôi đang cố tạo các biến phát sóng từ bên trong các phương thức Python (cố gắng trừu tượng một số phương thức tiện ích mà tôi đang tạo dựa vào các hoạt động phân tán). Tuy nhiên, tôi dường như không thể truy cập các biến phát sóng từ bên trong công nhân Spark.PySpark phát các biến từ các chức năng cục bộ

Hãy nói rằng tôi đã thiết lập này:

def main(): 
    sc = SparkContext() 
    SomeMethod(sc) 

def SomeMethod(sc): 
    someValue = rand() 
    V = sc.broadcast(someValue) 
    A = sc.parallelize().map(worker) 

def worker(element): 
    element *= V.value ### NameError: global name 'V' is not defined ### 

Tuy nhiên, nếu tôi thay vì loại bỏ SomeMethod() trung gian, nó hoạt động tốt.

def main(): 
    sc = SparkContext() 
    someValue = rand() 
    V = sc.broadcast(someValue) 
    A = sc.parallelize().map(worker) 

def worker(element): 
    element *= V.value # works just fine 

Tôi không muốn đặt tất cả logic Spark của mình theo phương pháp chính, nếu có thể. Có cách nào để phát các biến từ bên trong các hàm cục bộ và để chúng có thể hiển thị trên toàn cầu đối với các nhân viên Spark không?

Cách khác, mẫu thiết kế tốt cho loại tình huống này - ví dụ: tôi muốn viết một phương pháp đặc biệt cho Spark là khép kín và thực hiện một chức năng cụ thể mà tôi muốn sử dụng lại?

Trả lời

7

Tôi không chắc chắn tôi hoàn toàn hiểu câu hỏi nhưng, nếu bạn cần V đối tượng bên trong hàm nhân bạn sau đó bạn chắc chắn nên vượt qua nó như một tham số, nếu không phương pháp này là không thực sự khép kín:

def worker(V, element): 
    element *= V.value 

Bây giờ để sử dụng nó trong các chức năng bản đồ mà bạn cần phải sử dụng một phần, vì vậy bản đồ mà chỉ nhìn thấy một hàm 1 tham số:

from functools import partial 

def SomeMethod(sc): 
    someValue = rand() 
    V = sc.broadcast(someValue) 
    A = sc.parallelize().map(partial(worker, V=V)) 
+0

có bất kỳ tác động hiệu suất trong việc thông qua các biến phát sóng xung quanh như thế này? Ví dụ, tôi đã dựa vào một biến phát sóng trong một hàm map() trên hàng chục nghìn (hoặc nhiều) hàng. cái gì đó như 'def transform (hàng): return broadcast_variable.value [row [0]] ' mà sau đó được sử dụng trong một' bản đồ()' chức năng như 'rdd.map (transform) ' – iralls

+0

Cảm ơn giải pháp này đã giúp tôi tránh việc sử dụng toàn cầu cho biến phát sóng. Xin lưu ý rằng bạn nên thay thế thứ tự của các tham số phương thức công nhân sao cho tham số 'yếu tố' (được điền bởi khung công tác Spark) sẽ là đầu tiên. Nếu không nó sẽ không hoạt động. –

Các vấn đề liên quan