5

Tôi có hai DataFrameab. a là nhưCách nén hai (hoặc nhiều hơn) DataFrame trong Spark

Column 1 | Column 2 
abc  | 123 
cde  | 23 

b là như

Column 1 
1  
2  

Tôi muốn nén ab (hoặc thậm chí nhiều hơn) DataFrames mà trở thành một cái gì đó như:

Column 1 | Column 2 | Column 3 
abc  | 123  | 1 
cde  | 23  | 2 

Làm thế nào tôi có thể làm nó?

+0

Có an toàn để giả định rằng hai dataframes có cùng # hàng? –

Trả lời

16

Thao tác như thế này không được API DataFrame hỗ trợ. Có thể zip hai RDD nhưng để làm cho nó hoạt động, bạn phải khớp cả số phân vùng và số phần tử trên mỗi phân vùng. Giả sử đây là trường hợp:

import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructField, StructType, LongType} 

val a: DataFrame = sc.parallelize(Seq(
    ("abc", 123), ("cde", 23))).toDF("column_1", "column_2") 
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3") 

// Merge rows 
val rows = a.rdd.zip(b.rdd).map{ 
    case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)} 

// Merge schemas 
val schema = StructType(a.schema.fields ++ b.schema.fields) 

// Create new data frame 
val ab: DataFrame = sqlContext.createDataFrame(rows, schema) 

Nếu điều kiện trên không được đáp ứng lựa chọn duy nhất mà đến với tâm là thêm một chỉ số và tham gia:

def addIndex(df: DataFrame) = sqlContext.createDataFrame(
    // Add index 
    df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)}, 
    // Create schema 
    StructType(df.schema.fields :+ StructField("_index", LongType, false)) 
) 

// Add indices 
val aWithIndex = addIndex(a) 
val bWithIndex = addIndex(b) 

// Join and clean 
val ab = aWithIndex 
    .join(bWithIndex, Seq("_index")) 
    .drop("_index") 
+0

Làm thế nào về 'withColumn' trên' DataFrame'? – Reactormonk

+0

@Reactormonk Bạn sử dụng nó ở đây như thế nào? – zero323

+0

Sử dụng cột màu để lấy cột từ df b và sau đó withColumn để thêm cột vào? Không thử nó và tôi cũng có thể tưởng tượng Spark không hỗ trợ điều này. –

1

Thực hiện Dataframes Scala của, không có đơn giản cách ghép hai datafram thành một. Chúng ta có thể đơn giản làm việc xung quanh giới hạn này bằng cách thêm các chỉ mục vào mỗi hàng của các khung dữ liệu. Sau đó, chúng ta có thể tham gia vào bên trong bởi các chỉ số này. Đây là mã sơ khai của tôi thực hiện điều này:

val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2") 
val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId) 

val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3") 
val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId) 

aWithId.join(bWithId, "id") 

A little light reading - Check out how Python does this!

Các vấn đề liên quan