2015-05-18 14 views
25

Tôi đang cố gắng giải quyết vấn đề về tuổi của việc thêm số thứ tự vào tập dữ liệu. Tôi đang làm việc với DataFrames và dường như không có DataFrame nào tương đương với RDD.zipWithIndex. Mặt khác, các công trình sau đây hoạt động nhiều hơn hoặc ít hơn theo cách tôi muốn:DataFrame-ified zipWithIndex

val origDF = sqlContext.load(...)  

val seqDF= sqlContext.createDataFrame(
    origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)), 
    StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields) 
) 

Trong ứng dụng thực tế của tôi, origDF sẽ không được tải trực tiếp ra khỏi tệp - nó sẽ được tạo bởi nối 2-3 DataFrames khác với nhau và sẽ chứa tối đa 100 triệu hàng.

Có cách nào tốt hơn để thực hiện việc này không? Tôi có thể làm gì để tối ưu hóa nó?

Trả lời

3

Từ Spark 1.6 có một chức năng gọi là monotonically_increasing_id()
Nó tạo ra một mới cột với chỉ số đơn sắc 64 bit duy nhất cho mỗi hàng
Nhưng không phải do hậu quả, mỗi phân vùng bắt đầu một phạm vi mới, vì vậy chúng tôi phải tính toán từng phân đoạn bù đắp bef quặng sử dụng nó.
Đang cố gắng để cung cấp một giải pháp "RDD-tự do", tôi đã kết thúc với một số thu(), nhưng nó chỉ thu thập offsets, một giá trị mỗi phân vùng, vì vậy nó sẽ không gây ra oom

def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = { 
    val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id()) 

    val partitionOffsets = dfWithPartitionId 
     .groupBy("partition_id") 
     .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id") 
     .orderBy("partition_id") 
     .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt") 
     .collect() 
     .map(_.getLong(0)) 
     .toArray 

    dfWithPartitionId 
     .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id"))) 
     .withColumn(indexName, col("partition_offset") + col("inc_id")) 
     .drop("partition_id", "partition_offset", "inc_id") 
}

Giải pháp này không repack các hàng ban đầu và không phân vùng lại dataframe lớn ban đầu, vì vậy nó là khá nhanh trong thế giới thực: 200GB dữ liệu CSV (43 triệu hàng với 150 cột) đọc, lập chỉ mục và đóng gói vào sàn gỗ trong 2 phút 240 lõi
Sau khi kiểm tra giải pháp của mình, tôi đã chạy Kirk Broadhurst's solution và chậm hơn 20 giây
Bạn có thể muốn hoặc không muốn sử dụng dfWithPartitionId.cache(), tùy thuộc vào tác vụ

+0

Rất tốt! Làm tốt lắm! –

27

Bài viết sau đã được đăng thay mặt cho David Griffin (đã chỉnh sửa trong câu hỏi).

Phương pháp dfZipWithIndex tất cả khiêu vũ, tất cả đều nhảy múa. Bạn có thể thiết lập thời gian bắt đầu bù đắp (mặc định là 1), tên cột chỉ mục (mặc định là "id"), và đặt cột ở phía trước hoặc phía sau:

import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.types.{LongType, StructField, StructType} 
import org.apache.spark.sql.Row 


def dfZipWithIndex(
    df: DataFrame, 
    offset: Int = 1, 
    colName: String = "id", 
    inFront: Boolean = true 
) : DataFrame = { 
    df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(ln => 
     Row.fromSeq(
     (if (inFront) Seq(ln._2 + offset) else Seq()) 
      ++ ln._1.toSeq ++ 
     (if (inFront) Seq() else Seq(ln._2 + offset)) 
    ) 
    ), 
    StructType(
     (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]()) 
     ++ df.schema.fields ++ 
     (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false))) 
    ) 
) 
} 
+0

@eliasah - Tôi đã tìm thấy cách hiển thị 'Cửa sổ' để thực hiện việc này. Nó là chậm hơn nhiều, tuy nhiên, nhưng figured bạn có thể muốn có một cái nhìn. Xem câu trả lời dưới đây. –

+0

Thật tuyệt vời. Bất kỳ tham chiếu đến một phiên bản PySpark? Cám ơn vì đã chia sẻ. – Tagar

5

Bắt đầu từ năm Spark 1.5, Window biểu đã được thêm vào Spark. Thay vì phải chuyển đổi DataFrame thành RDD, giờ đây bạn có thể sử dụng org.apache.spark.sql.expressions.row_number. Lưu ý rằng tôi đã tìm thấy hiệu suất cho số trên dfZipWithIndex nhanh hơn đáng kể so với thuật toán dưới đây. Nhưng tôi gửi bài nó vì:

  1. Một người nào khác sẽ bị cám dỗ để thử này
  2. Có lẽ ai đó có thể tối ưu hóa các từ ngữ dưới đây

Dù sao đi nữa, đây là những gì làm việc cho tôi:

import org.apache.spark.sql.expressions._ 

df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1)))) 

Lưu ý rằng tôi sử dụng lit(1) cho cả phân vùng và thứ tự - điều này làm cho mọi thứ nằm trong cùng phân vùng và dường như giữ nguyên bản gốc đặt hàng của DataFrame, nhưng tôi cho rằng đó là những gì làm chậm nó xuống.

Tôi đã thử nghiệm nó trên 4 cột DataFrame với 7.000.000 hàng và sự khác biệt tốc độ rất có ý nghĩa giữa điều này và trên dfZipWithIndex (như tôi đã nói, RDD chức năng là nhiều, nhanh hơn nhiều).

+2

Điều đó không gây ra lỗi OOM nếu tập dữ liệu không vừa với bộ nhớ của một nhân viên? –

+1

Tôi không có ý tưởng - Tôi chỉ biết rằng nó là nhiều, chậm hơn nhiều so với 'RDD' dựa trên' zipWithIndex', và đó là quá đủ cho tôi để ngừng suy nghĩ về nó. Tôi đã đăng trên để những người khác sẽ không bị cám dỗ để đi quá xa xuống con đường này; bản gốc 'dfZipWithIndex' vẫn có vẻ là cách tiếp cận tốt nhất. –

+0

Cảm ơn bạn đã chia sẻ điều này, tôi nghĩ rằng cách không chuyển đổi DF sang RDD sẽ nhanh hơn lúc đầu, và sẽ không đi quá xa theo cách đó ngay bây giờ. –

0

PySpark phiên bản:

from pyspark.sql.types import LongType, StructField, StructType 

def dfZipWithIndex (df, offset=1, colName="rowId"): 
    ''' 
     Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
     and preserves a schema 

     :param df: source dataframe 
     :param offset: adjustment to zipWithIndex()'s index 
     :param colName: name of the index column 
    ''' 

    new_schema = StructType(
        [StructField(colName,LongType(),True)]  # new added field in front 
        + df.schema.fields       # previous schema 
       ) 

    zipped_rdd = df.rdd.zipWithIndex() 

    new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row))) 

    return spark.createDataFrame(new_rdd, new_schema) 

Cũng tạo ra một jira để thêm chức năng này trong Spark natively: https://issues.apache.org/jira/browse/SPARK-23074