2016-05-24 21 views
8

Tôi có một khung dữ liệu tia lửa có cấu trúc sau. BodyText_token có các thẻ (được xử lý/tập hợp các từ). Và tôi có danh sách lồng nhau của các từ khóa được xác địnhChuyển một cột khung dữ liệu và danh sách bên ngoài tới udf trong withColumn

root 
|-- id: string (nullable = true) 
|-- body: string (nullable = true) 
|-- bodyText_token: array (nullable = true) 

keyword_list=['union','workers','strike','pay','rally','free','immigration',], 
['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']] 

Tôi cần kiểm tra số lượng thẻ nằm trong mỗi danh sách từ khóa và thêm kết quả làm cột mới của khung dữ liệu hiện có. Ví dụ: nếu tokens =["become", "farmer","rally","workers","student"] kết quả sẽ là -> [1,2,0]

Chức năng sau đây hoạt động như mong đợi.

def label_maker_topic(tokens,topic_words): 
    twt_list = [] 
    for i in range(0, len(topic_words)): 
     count = 0 
     #print(topic_words[i]) 
     for tkn in tokens: 
      if tkn in topic_words[i]: 
       count += 1 
     twt_list.append(count) 

    return twt_list 

Tôi đã sử dụng udf trong withColumn để truy cập chức năng và tôi gặp lỗi. Tôi nghĩ rằng đó là về việc chuyển một danh sách bên ngoài sang một udf. Có cách nào tôi có thể vượt qua danh sách bên ngoài và cột datafram để một udf và thêm một cột mới vào dataframe của tôi?

topicWord = udf(label_maker_topic,StringType()) 
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list)) 

Trả lời

20

Giải pháp sạch nhất là phải vượt qua đối số bổ sung sử dụng đóng cửa:

def make_topic_word(topic_words): 
    return udf(lambda c: label_maker_topic(c, topic_words)) 

df = sc.parallelize([(["union"],)]).toDF(["tokens"]) 

(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens"))) 
    .show()) 

này không đòi hỏi bất kỳ thay đổi trong keyword_list hoặc chức năng bạn quấn với UDF. Bạn cũng có thể sử dụng phương thức này để truyền một đối tượng tùy ý. Điều này có thể được sử dụng để vượt qua ví dụ: danh sách sets để tra cứu hiệu quả.

Nếu bạn muốn sử dụng UDF hiện tại của bạn và vượt qua topic_words trực tiếp bạn sẽ phải chuyển nó sang một cột đen đầu tiên:

from pyspark.sql.functions import array, lit 

ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list]) 
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show() 

Tùy thuộc vào dữ liệu và các yêu cầu có thể thay thế, các giải pháp hiệu quả hơn của bạn, không yêu cầu UDF (phát nổ + tổng hợp + thu gọn) hoặc tra cứu (băm + hoạt động vectơ).

7

Các công việc sau tốt nơi bất kỳ thông số bên ngoài có thể được truyền cho UDF (mã tinh chỉnh để giúp bất cứ ai)

topicWord=udf(lambda tkn: label_maker_topic(tkn,topic_words),StringType()) 
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token)) 
+0

này hoạt động nhưng tôi sẽ phải cẩn thận với điều này, bởi vì udf sẽ có 'topic_words 'giá trị tại thời điểm udf được xác định. Vì vậy, việc thay đổi 'topic_words' và tái sử dụng udf sau sẽ không hoạt động - nó vẫn sẽ sử dụng giá trị' topic_words' tại thời điểm udf được xác định. – CHP

Các vấn đề liên quan