2016-04-13 19 views
13

Tôi mới sử dụng tia lửa & pyspark.pyspark EOFError sau khi gọi bản đồ

Tôi đang đọc tệp csv nhỏ (~ 40k) vào một khung dữ liệu.

from pyspark.sql import functions as F 
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv') 
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0)) 
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF() 

tôi nhận được một số lỗi lạ mà không xảy ra mỗi lần duy nhất, nhưng không xảy ra khá thường xuyên

>>> df2.show(1) 
+--------------------+---------+ 
|   features| label| 
+--------------------+---------+ 
|[0.0,0.0,0.0,0.0,...|4700734.0| 
+--------------------+---------+ 
only showing top 1 row 

>>> df2.count() 
41999                   
>>> df2.show(1) 
+--------------------+---------+ 
|   features| label| 
+--------------------+---------+ 
|[0.0,0.0,0.0,0.0,...|4700734.0| 
+--------------------+---------+ 
only showing top 1 row 

>>> df2.count() 
41999                   
>>> df2.show(1) 
Traceback (most recent call last): 
    File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager 
    File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker  
    File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main 
    if read_int(infile) == SpecialLengths.END_OF_STREAM: 
    File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int 
    raise EOFError 
EOFError 
+--------------------+---------+ 
|   features| label| 
+--------------------+---------+ 
|[0.0,0.0,0.0,0.0,...|4700734.0| 
+--------------------+---------+ 
only showing top 1 row 

Khi EOFError mà đã được nâng lên, tôi sẽ không nhìn thấy nó một lần nữa cho đến khi tôi làm điều gì đó yêu cầu tương tác với máy chủ spark

Khi tôi gọi df2.count(), nó cho thấy lời nhắc [Stage xxx] là ý tôi muốn nói đến máy chủ tia lửa. Bất cứ điều gì mà gây ra mà dường như cuối cùng kết thúc cho EOFError một lần nữa khi tôi làm điều gì đó với df2.

Nó dường như không xảy ra với df (so với df2) vì vậy có vẻ như nó phải là một cái gì đó xảy ra với dòng df.map().

+1

Tôi đã nghe từ danh sách người dùng tia lửa rằng thông báo này chỉ hơi dài dòng và có thể bỏ qua. – Pete

+0

Pete, bạn có thể chỉ cho chúng tôi tại lưu trữ không? – rjurney

+0

Tôi đã tìm kiếm danh sách người dùng spark và không thể tìm thấy bất cứ điều gì về điều này liên quan đến EOFError: ( – rjurney

Trả lời

0

Bạn có thể thử làm bản đồ sau khi chuyển dataframe thành rdd hay không. Bạn đang áp dụng chức năng bản đồ trên một khung dữ liệu và sau đó tạo lại một khung dữ liệu từ đó.Syntax sẽ giống như

df.rdd.map().toDF() 

Vui lòng cho tôi biết nếu nó hoạt động. Cảm ơn.

0

Tôi tin rằng bạn đang chạy Spark 2.x trở lên. Dưới đây mã nên tạo dataframe của bạn từ csv:

df = spark.read.format("csv").option("header", "true").load("csvfile.csv") 

sau đó bạn có thể có dưới mã:

df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0)) 

và sau đó bạn có thể tạo df2 mà không Row và toDF()

Hãy cho tôi biết nếu điều này hoạt động hoặc nếu bạn đang sử dụng Spark 1.6 ... cảm ơn.

Các vấn đề liên quan