53

Tôi có một DataFrame Spark (sử dụng PySpark 1.5.1) và muốn thêm một cột mới.Tôi làm cách nào để thêm cột mới vào Spark DataFrame (sử dụng PySpark)?

Tôi đã thử những điều sau đây mà không cần bất kỳ thành công:

type(randomed_hours) # => list 

# Create in Python and transform to RDD 

new_col = pd.DataFrame(randomed_hours, columns=['new_col']) 

spark_new_col = sqlContext.createDataFrame(new_col) 

my_df_spark.withColumn("hours", spark_new_col["new_col"]) 

Cũng có một lỗi sử dụng này:

my_df_spark.withColumn("hours", sc.parallelize(randomed_hours)) 

Vậy làm thế nào để tôi thêm một cột mới (dựa trên Python vector) để một DataFrame hiện có với PySpark?

Trả lời

101

Bạn không thể thêm cột tùy ý vào số DataFrame trong Spark. cột mới có thể được tạo ra chỉ bằng cách sử dụng literals (loại đen khác được mô tả trong How to add a constant column in a Spark DataFrame?)

from pyspark.sql.functions import lit 

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) 

df_with_x4 = df.withColumn("x4", lit(0)) 
df_with_x4.show() 

## +---+---+-----+---+ 
## | x1| x2| x3| x4| 
## +---+---+-----+---+ 
## | 1| a| 23.0| 0| 
## | 3| B|-23.0| 0| 
## +---+---+-----+---+ 

chuyển một cột hiện có:

from pyspark.sql.functions import exp 

df_with_x5 = df_with_x4.withColumn("x5", exp("x3")) 
df_with_x5.show() 

## +---+---+-----+---+--------------------+ 
## | x1| x2| x3| x4|     x5| 
## +---+---+-----+---+--------------------+ 
## | 1| a| 23.0| 0| 9.744803446248903E9| 
## | 3| B|-23.0| 0|1.026187963170189...| 
## +---+---+-----+---+--------------------+ 

bao gồm sử dụng join:

from pyspark.sql.functions import exp 

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v")) 
df_with_x6 = (df_with_x5 
    .join(lookup, col("x1") == col("k"), "leftouter") 
    .drop("k") 
    .withColumnRenamed("v", "x6")) 

## +---+---+-----+---+--------------------+----+ 
## | x1| x2| x3| x4|     x5| x6| 
## +---+---+-----+---+--------------------+----+ 
## | 1| a| 23.0| 0| 9.744803446248903E9| foo| 
## | 3| B|-23.0| 0|1.026187963170189...|null| 
## +---+---+-----+---+--------------------+----+ 

hoặc tạo với hàm/udf:

from pyspark.sql.functions import rand 

df_with_x7 = df_with_x6.withColumn("x7", rand()) 
df_with_x7.show() 

## +---+---+-----+---+--------------------+----+-------------------+ 
## | x1| x2| x3| x4|     x5| x6|     x7| 
## +---+---+-----+---+--------------------+----+-------------------+ 
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617| 
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873| 
## +---+---+-----+---+--------------------+----+-------------------+ 

Các chức năng được tích hợp sẵn, hiệu năng (pyspark.sql.functions), được ánh xạ tới biểu thức Xúc tác, thường được ưu tiên hơn các hàm do người dùng xác định.

Nếu bạn muốn thêm nội dung của một RDD tùy ý như là một cột bạn có thể

  • thêm row numbers to existing data frame
  • gọi zipWithIndex trên RDD và chuyển nó sang khung dữ liệu
  • tham gia cả hai chỉ số sử dụng như một tham gia chìa khóa
+0

"Các cột mới chỉ có thể được tạo bằng cách sử dụng các chữ" Chính xác thì chữ có nghĩa gì trong ngữ cảnh này? – timbram

35

Để thêm một cột bằng cách sử dụng UDF:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) 

from pyspark.sql.functions import udf 
from pyspark.sql.types import * 

def valueToCategory(value): 
    if value == 1: return 'cat1' 
    elif value == 2: return 'cat2' 
    ... 
    else: return 'n/a' 

# NOTE: it seems that calls to udf() must be after SparkContext() is called 
udfValueToCategory = udf(valueToCategory, StringType()) 
df_with_cat = df.withColumn("category", udfValueToCategory("x1")) 
df_with_cat.show() 

## +---+---+-----+---------+ 
## | x1| x2| x3| category| 
## +---+---+-----+---------+ 
## | 1| a| 23.0|  cat1| 
## | 3| B|-23.0|  n/a| 
## +---+---+-----+---------+ 
13

Đối Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen')) 
+1

Cần phải là df.select ('*', (df.age + 10) .alias ('agePlusTen')) –

+0

Cảm ơn, và nếu bạn nhập 'df = df.select ('*', (df.age + 10) .alias ('agePlusTen')) 'bạn có hiệu quả _adding một cột tùy ý như @ zero323 cảnh báo chúng tôi ở trên là không thể, trừ khi có điều gì đó sai trái khi làm điều này trong Spark, trong Pandas đó là cách tiêu chuẩn .. – cardamom

+0

Có một phiên bản này cho pySpark? – Tagar

-1

Bạn có thể định nghĩa một mới udf khi thêm một column_name:

u_f = F.udf(lambda :yourstring,StringType()) 
a.select(u_f().alias('column_name') 
-1
from pyspark.sql.functions import udf 
from pyspark.sql.types import * 
func_name = udf(
    lambda val: val, # do sth to val 
    StringType() 
) 
df.withColumn('new_col', func_name(df.old_col)) 
+0

Bạn cần phải gọi 'StringType()'. – gberger

0

Tôi muốn đưa ra một ví dụ tổng quát cho một trường hợp sử dụng rất giống nhau:

Trường hợp sử dụng: Tôi có csv bao gồm:

First|Third|Fifth 
data|data|data 
data|data|data 
...billion more lines 

tôi cần phải thực hiện một số biến đổi và csv cuối cùng cần phải trông giống như

First|Second|Third|Fourth|Fifth 
data|null|data|null|data 
data|null|data|null|data 
...billion more lines 

tôi cần phải làm điều này vì đây là sơ đồ được xác định bởi một số mô hình và tôi cần cho dữ liệu cuối cùng của tôi là tương thích với SQL Bulk Inserts và những thứ như vậy.

như vậy:

1) Tôi đọc csv gốc bằng spark.read và gọi là "df".

2) Tôi làm điều gì đó với dữ liệu.

3) Tôi thêm các cột rỗng sử dụng kịch bản này:

outcols = [] 
for column in MY_COLUMN_LIST: 
    if column in df.columns: 
     outcols.append(column) 
    else: 
     outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column))) 

df = df.select(outcols) 

Bằng cách này, bạn có thể cấu trúc giản đồ của bạn sau khi tải một csv (cũng sẽ làm việc cho sắp xếp lại các cột nếu bạn phải làm điều này cho nhiều những cái bàn).

Các vấn đề liên quan