2016-10-15 55 views
7

Tôi có một khung dữ liệu Spark có một số giá trị bị thiếu. Tôi muốn thực hiện một imputation đơn giản bằng cách thay thế các giá trị còn thiếu với giá trị trung bình cho cột đó. Tôi rất mới với Spark, vì vậy tôi đã đấu tranh để thực hiện logic này. Đây là những gì tôi đã cố gắng làm như vậy cho đến nay:Thay thế các giá trị bị thiếu bằng giá trị trung bình - Biểu tượng Spark

a) Để làm điều này cho một cột duy nhất (giả sử Col A), dòng mã này dường như làm việc:

df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA")) 
    .first()(0).asInstanceOf[Double]) 
    .otherwise($"ColA")) 

b) Tuy nhiên, Tôi đã không thể tìm ra, làm thế nào để làm điều này cho tất cả các cột trong dataframe của tôi. Tôi đã thử chức năng Bản đồ, nhưng tôi tin rằng nó lặp qua từng hàng của một khung dữ liệu

c) Có một câu hỏi tương tự trên SO - here. Và trong khi tôi thích giải pháp (sử dụng các bảng tổng hợp và kết hợp lại), tôi rất muốn biết nếu có một cách để làm điều này bằng cách lặp qua mỗi cột (tôi đến từ R, do đó, lặp qua mỗi cột bằng cách sử dụng một thứ tự cao hơn chức năng như lapply có vẻ tự nhiên hơn với tôi).

Cảm ơn!

+0

Bằng cách này, nó được coi là một thực tế xấu để sử dụng 'asInstanceOf [T]' trong 'scala'. –

Trả lời

11

Spark> = 2,2

Bạn có thể sử dụng org.apache.spark.ml.feature.Imputer.

Scala:

import org.apache.spark.ml.feature.Imputer 

val imputer = new Imputer() 
    .setInputCols(df.columns) 
    .setOutputCols(df.columns.map(c => s"${c}_imputed")) 
    .setStrategy("mean") 

imputer.fit(df).transform(df) 

Python:

from pyspark.ml.feature import Imputer 

imputer = Imputer(
    inputCols=df.columns, 
    outputCols=["{}_imputed".format(c) for c in df.columns] 
) 
imputer.fit(df).transform(df) 

Spark < 2,2

Ở đây bạn là:

import org.apache.spark.sql.functions.mean 

df.na.fill(df.columns.zip(
    df.select(df.columns.map(mean(_)): _*).first.toSeq 
).toMap) 

nơi

df.columns.map(mean(_)): Array[Column] 

tính trung bình cho mỗi cột,

df.select(_: *).first.toSeq: Seq[Any] 

thu thập giá trị tổng hợp và chuyển đổi hàng để Seq[Any] (Tôi biết điều đó là không tối ưu nhưng đây là API chúng tôi phải làm việc với),

df.columns.zip(_).toMap: Map[String,Any] 

tạo aMap: Map[String, Any] mà bản đồ từ tên cột để trung bình của nó, và cuối cùng là:

df.na.fill(_): DataFrame 

lấp đầy các giá trị thiếu bằng:

fill: Map[String, Any] => DataFrame 

từ DataFrameNaFunctions.

Để ingore NaN mục bạn có thể thay thế:

df.select(df.columns.map(mean(_)): _*).first.toSeq 

với:

import org.apache.spark.sql.functions.{col, isnan, when} 


df.select(df.columns.map(
    c => mean(when(!isnan(col(c)), col(c))) 
): _*).first.toSeq 
1

Đối PySpark, đây là đoạn code tôi sử dụng:

mean_dict = { col: 'mean' for col in df.columns } 
col_avgs = df.agg(mean_dict).collect()[0].asDict() 
col_avgs = { k[4:-1]: v for k,v in col_avgs.iteritems() } 
df.fillna(col_avgs).show() 

Bốn bước như sau:

  1. Tạo từ điển mean_dict tên cột ánh xạ tới các hoạt động tổng hợp (có nghĩa)
  2. Tính giá trị trung bình cho mỗi cột, và lưu nó như từ điển col_avgs
  3. Tên cột trong col_avgs bắt đầu với avg( và kết thúc bằng ), ví dụ avg(col1). Bỏ dấu ngoặc đơn ra.
  4. Điền các cột của dataframe với mức trung bình sử dụng col_avgs
1

Đối imputing trung bình (thay vì giá trị trung bình) trong PySpark < 2,2

## filter numeric cols 
num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "int"}, df.dtypes)] 
### Compute a dict with <col_name, median_value> 
median_dict = dict() 
for c in num_cols: 
    median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0] 

Sau đó, áp dụng na.fill

df_imputed = df.na.fill(median_dict) 
Các vấn đề liên quan