Tôi đang cố gắng làm một số văn bản NLP làm sạch một số cột Unicode trong PySpark DataFrame. Tôi đã thử trong Spark 1.3, 1.5 và 1.6 và dường như không thể làm mọi thứ để làm việc cho cuộc sống của tôi. Tôi cũng đã thử sử dụng Python 2.7 và Python 3.4.Pyspark DataFrame UDF trên cột văn bản
Tôi đã tạo một udf cực kỳ đơn giản như được thấy bên dưới mà chỉ cần trả lại chuỗi cho mỗi bản ghi trong cột mới. Các chức năng khác sẽ thao tác văn bản và sau đó trả lại văn bản đã thay đổi trở lại trong một cột mới.
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
def dummy_function(data_str):
cleaned_str = 'dummyData'
return cleaned_str
dummy_function_udf = udf(dummy_function, StringType())
Một số dữ liệu mẫu có thể được giải nén từ here.
Đây là mã tôi sử dụng để nhập dữ liệu và sau đó áp dụng udf trên.
# Load a text file and convert each line to a Row.
lines = sc.textFile("classified_tweets.txt")
parts = lines.map(lambda l: l.split("\t"))
training = parts.map(lambda p: (p[0], p[1]))
# Create dataframe
training_df = sqlContext.createDataFrame(training, ["tweet", "classification"])
training_df.show(5)
+--------------------+--------------+
| tweet|classification|
+--------------------+--------------+
|rt @jiffyclub: wi...| python|
|rt @arnicas: ipyt...| python|
|rt @treycausey: i...| python|
|what's my best op...| python|
|rt @raymondh: #py...| python|
+--------------------+--------------+
# Apply UDF function
df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
df.show(5)
Khi tôi chạy df.show (5), tôi nhận được lỗi sau. Tôi hiểu rằng vấn đề rất có thể không xuất phát từ chương trình() nhưng dấu vết không giúp tôi nhiều.
---------------------------------------------------------------------------Py4JJavaError Traceback (most recent call last)<ipython-input-19-0b21c233c724> in <module>()
1 df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
----> 2 df.show(5)
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/dataframe.py in show(self, n, truncate)
255 +---+-----+
256 """
--> 257 print(self._jdf.showString(n, truncate))
258
259 def __repr__(self):
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw)
43 def deco(*a, **kw):
44 try:
---> 45 return f(*a, **kw)
46 except py4j.protocol.Py4JJavaError as e:
47 s = e.java_exception.toString()
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(
Py4JJavaError: An error occurred while calling o474.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda>
IndexError: list index out of range
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda>
IndexError: list index out of range
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
chức năng thực tế tôi đang cố gắng:
def tag_and_remove(data_str):
cleaned_str = ' '
# noun tags
nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
# adjectives
jj_tags = ['JJ', 'JJR', 'JJS']
# verbs
vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
nltk_tags = nn_tags + jj_tags + vb_tags
# break string into 'words'
text = data_str.split()
# tag the text and keep only those with the right tags
tagged_text = pos_tag(text)
for tagged_word in tagged_text:
if tagged_word[1] in nltk_tags:
cleaned_str += tagged_word[0] + ' '
return cleaned_str
tag_and_remove_udf = udf(tag_and_remove, StringType())
Bạn có chắc chắn 'l.split ('\ t') 'trả về nhiều hơn một mục? Lỗi chỉ mục có thể là từ 'training = parts.map (...)'. Dữ liệu của bạn trông như thế nào - bạn có chắc chắn có các tab được sử dụng ở mọi nơi không? – AChampion
Có, tôi có thể xác nhận rằng dữ liệu có hai cột. Tôi đã quét sạch dữ liệu của tất cả khoảng trắng bên cạnh các khoảng trống trước khi đưa vào flatfile. Tôi sẽ đặt một mẫu nhỏ lên trên. – dreyco676
Bạn không chia nhỏ trên các khoảng trống - chỉ các tab - 'l.split()' sẽ phân chia trên bất kỳ khoảng trắng nào. – AChampion