Thao tác như thế này không được API DataFrame hỗ trợ. Có thể zip
hai RDD nhưng để làm cho nó hoạt động, bạn phải khớp cả số phân vùng và số phần tử trên mỗi phân vùng. Giả sử đây là trường hợp:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}
val a: DataFrame = sc.parallelize(Seq(
("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")
// Merge rows
val rows = a.rdd.zip(b.rdd).map{
case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}
// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)
// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)
Nếu điều kiện trên không được đáp ứng lựa chọn duy nhất mà đến với tâm là thêm một chỉ số và tham gia:
def addIndex(df: DataFrame) = sqlContext.createDataFrame(
// Add index
df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
// Create schema
StructType(df.schema.fields :+ StructField("_index", LongType, false))
)
// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)
// Join and clean
val ab = aWithIndex
.join(bWithIndex, Seq("_index"))
.drop("_index")
Có an toàn để giả định rằng hai dataframes có cùng # hàng? –