2016-01-15 30 views
7

Tôi đang cố gắng làm một số văn bản NLP làm sạch một số cột Unicode trong PySpark DataFrame. Tôi đã thử trong Spark 1.3, 1.5 và 1.6 và dường như không thể làm mọi thứ để làm việc cho cuộc sống của tôi. Tôi cũng đã thử sử dụng Python 2.7 và Python 3.4.Pyspark DataFrame UDF trên cột văn bản

Tôi đã tạo một udf cực kỳ đơn giản như được thấy bên dưới mà chỉ cần trả lại chuỗi cho mỗi bản ghi trong cột mới. Các chức năng khác sẽ thao tác văn bản và sau đó trả lại văn bản đã thay đổi trở lại trong một cột mới.

import pyspark 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 
from pyspark.sql import SQLContext 
from pyspark.sql.functions import udf 

def dummy_function(data_str): 
    cleaned_str = 'dummyData' 
    return cleaned_str 

dummy_function_udf = udf(dummy_function, StringType()) 

Một số dữ liệu mẫu có thể được giải nén từ here.

Đây là mã tôi sử dụng để nhập dữ liệu và sau đó áp dụng udf trên.

# Load a text file and convert each line to a Row. 
lines = sc.textFile("classified_tweets.txt") 
parts = lines.map(lambda l: l.split("\t")) 
training = parts.map(lambda p: (p[0], p[1])) 

# Create dataframe 
training_df = sqlContext.createDataFrame(training, ["tweet", "classification"]) 

training_df.show(5) 
+--------------------+--------------+ 
|    tweet|classification| 
+--------------------+--------------+ 
|rt @jiffyclub: wi...|  python| 
|rt @arnicas: ipyt...|  python| 
|rt @treycausey: i...|  python| 
|what's my best op...|  python| 
|rt @raymondh: #py...|  python| 
+--------------------+--------------+ 

# Apply UDF function 
df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet'])) 
df.show(5) 

Khi tôi chạy df.show (5), tôi nhận được lỗi sau. Tôi hiểu rằng vấn đề rất có thể không xuất phát từ chương trình() nhưng dấu vết không giúp tôi nhiều.

---------------------------------------------------------------------------Py4JJavaError        Traceback (most recent call last)<ipython-input-19-0b21c233c724> in <module>() 
     1 df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet'])) 
----> 2 df.show(5) 
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/dataframe.py in show(self, n, truncate) 
    255   +---+-----+ 
    256   """ 
--> 257   print(self._jdf.showString(n, truncate)) 
    258 
    259  def __repr__(self): 
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(
Py4JJavaError: An error occurred while calling o474.showString. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda> 
IndexError: list index out of range 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125) 
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913) 
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929) 
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968) 
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) 
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) 
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) 
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) 
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) 
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) 
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) 
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) 
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) 
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413) 
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) 
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) 
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) 
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda> 
IndexError: list index out of range 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125) 
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913) 
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929) 
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968) 
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) 
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

chức năng thực tế tôi đang cố gắng:

def tag_and_remove(data_str): 
    cleaned_str = ' ' 
    # noun tags 
    nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS'] 
    # adjectives 
    jj_tags = ['JJ', 'JJR', 'JJS'] 
    # verbs 
    vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ'] 
    nltk_tags = nn_tags + jj_tags + vb_tags 

    # break string into 'words' 
    text = data_str.split() 

    # tag the text and keep only those with the right tags 
    tagged_text = pos_tag(text) 
    for tagged_word in tagged_text: 
     if tagged_word[1] in nltk_tags: 
      cleaned_str += tagged_word[0] + ' ' 

    return cleaned_str 


tag_and_remove_udf = udf(tag_and_remove, StringType()) 
+2

Bạn có chắc chắn 'l.split ('\ t') 'trả về nhiều hơn một mục? Lỗi chỉ mục có thể là từ 'training = parts.map (...)'. Dữ liệu của bạn trông như thế nào - bạn có chắc chắn có các tab được sử dụng ở mọi nơi không? – AChampion

+0

Có, tôi có thể xác nhận rằng dữ liệu có hai cột. Tôi đã quét sạch dữ liệu của tất cả khoảng trắng bên cạnh các khoảng trống trước khi đưa vào flatfile. Tôi sẽ đặt một mẫu nhỏ lên trên. – dreyco676

+2

Bạn không chia nhỏ trên các khoảng trống - chỉ các tab - 'l.split()' sẽ phân chia trên bất kỳ khoảng trắng nào. – AChampion

Trả lời

3

Tập dữ liệu của bạn không sạch. 985 dòng split('\t') để chỉ một giá trị:

>>> from operator import add 
>>> lines = sc.textFile("classified_tweets.txt") 
>>> parts = lines.map(lambda l: l.split("\t")) 
>>> parts.map(lambda l: (len(l), 1)).reduceByKey(add).collect() 
[(2, 149195), (1, 985)] 
>>> parts.filter(lambda l: len(l) == 1).take(5) 
[['"show me the money!” at what point do you start trying to monetize your #startup? tweet us with #startuplife.'], 
['a good pitch can mean money in the bank for your #startup. see how body language plays a key role: (via: ajalumnify)'], 
['100+ apps in five years? @2359media did it using microsoft #azure: #azureapps'], 
['does buying better coffee make you a better leader? little things can make a big difference: (via: @jmbrandonbb)'], 
['[email protected] graduates pitched\xa0#homeautomation #startups to #vcs! check out how they celebrated: ']] 

Vì vậy, thay đổi mã của bạn để:

>>> training = parts.filter(lambda l: len(l) == 2).map(lambda p: (p[0], p[1].strip())) 
>>> training_df = sqlContext.createDataFrame(training, ["tweet", "classification"]) 
>>> df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet'])) 
>>> df.show(5) 
+--------------------+--------------+---------+ 
|    tweet|classification| dummy| 
+--------------------+--------------+---------+ 
|rt @jiffyclub: wi...|  python|dummyData| 
|rt @arnicas: ipyt...|  python|dummyData| 
|rt @treycausey: i...|  python|dummyData| 
|what's my best op...|  python|dummyData| 
|rt @raymondh: #py...|  python|dummyData| 
+--------------------+--------------+---------+ 
only showing top 5 rows 
+0

Cảm ơn bạn. Tôi đã học được rằng chương trình() không nhất thiết làm cho việc phân tích cú pháp đầy đủ xảy ra nếu không có nhu cầu cho N được chỉ định. – dreyco676

4

Tôi nghĩ rằng bạn đang misdefining vấn đề, và có thể đơn giản hóa lambda của bạn cho các mục đích của câu hỏi này nhưng giấu giếm vấn đề thực sự.

stack trace bạn đọc

File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda> 
IndexError: list index out of range 

Khi tôi chạy mã này nó hoạt động tốt:

import pyspark 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 
from pyspark.sql import SQLContext 
from pyspark.sql.functions import udf 

training_df = sqlContext.sql("select 'foo' as tweet, 'bar' as classification") 

def dummy_function(data_str): 
    cleaned_str = 'dummyData' 
    return cleaned_str 

dummy_function_udf = udf(dummy_function, StringType()) 
df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet'])) 
df.show() 

+-----+--------------+---------+ 
|tweet|classification| dummy| 
+-----+--------------+---------+ 
| foo|   bar|dummyData| 
+-----+--------------+---------+ 

Bạn có chắc chắn không có một số lỗi khác trong dummy_function_udf của bạn? Bạn đang sử dụng udf 'thực' nào - ngoài phiên bản mẫu này?

+0

Cảm ơn bạn rất nhiều vì đã trả lời. Dường như dữ liệu văn bản luôn là điều ác và sẽ phá vỡ các trình phân tích cú pháp của bạn. Tôi mong đợi bất kỳ lỗi nào với phân tích cú pháp để hiển thị với training_df.show (5) nhưng có vẻ như nó chỉ phân tích các bản ghi N đầu tiên nếu được thực hiện mà không có biến đổi nào khác. – dreyco676

+0

Cảm ơn câu trả lời. Tôi đã có một vấn đề tương tự. Tôi có thể theo dõi và hỏi "udf" có nghĩa là gì ở đây không? Tôi đã sao chép mã vào vỏ của tôi chỉ để tìm lỗi sau 'Traceback (cuộc gọi gần đây nhất): Tệp "", dòng 1, trong NameError: name' udf 'không được xác định' – yuqli

+0

Chức năng do người dùng xác định. –

Các vấn đề liên quan