2016-02-29 30 views
20

Câu hỏi này không phải là mới, tuy nhiên tôi đang tìm thấy hành vi đáng ngạc nhiên trong Spark. Tôi cần phải thêm một cột ID hàng vào một DataFrame. Tôi đã sử dụng phương thức DataFrame monotonically_increasing_id() và nó cung cấp cho tôi một col bổ sung của các ID hàng đơn (không phải là liên tiếp bằng cách này, nhưng là duy nhất).Tôi làm cách nào để thêm cột liên tục của id hàng vào Spark DataFrame?

Vấn đề tôi gặp phải là khi tôi lọc DataFrame, các Id hàng trong DataFrame kết quả được gán lại. Hai DataFrames được hiển thị bên dưới.

  • một trong những đầu tiên là DataFrame ban đầu với các ID hàng bổ sung như sau:

    df.withColumn("rowId", monotonically_increasing_id()) 
    
  • các DataFrame thứ hai là một thu được sau khi lọc trên col P qua df.filter(col("P")).

Vấn đề được minh họa bằng các ROWID cho ID Khách hàng 169, đó là 5 trong DataFrame ban đầu, nhưng sau khi lọc mà ROWID (5) đã được tái giao cho custmId 773 khi ID Khách hàng 169 được lọc ra! Tôi không biết tại sao đây là hành vi mặc định.

Tôi muốn rowIds là "dính"; nếu tôi xóa các hàng khỏi DataFrame, tôi không muốn ID của họ "được sử dụng lại", tôi muốn chúng đi cùng với các hàng của chúng. Có thể làm điều đó không? Tôi không thấy bất kỳ cờ nào để yêu cầu hành vi này từ phương thức monotonically_increasing_id.

+---------+--------------------+-------+ 
| custId | features| P |rowId| 
+---------+--------------------+-------+ 
|806  |[50,5074,...| true| 0| 
|832  |[45,120,1...| true| 1| 
|216  |[6691,272...| true| 2| 
|926  |[120,1788...| true| 3| 
|875  |[54,120,1...| true| 4| 
|169  |[19406,21...| false| 5| 

after filtering on P: 
+---------+--------------------+-------+ 
| custId| features| P |rowId| 
+---------+--------------------+-------+ 
|  806|[50,5074,...| true| 0| 
|  832|[45,120,1...| true| 1| 
|  216|[6691,272...| true| 2| 
|  926|[120,1788...| true| 3| 
|  875|[54,120,1...| true| 4| 
|  773|[3136,317...| true| 5| 
+1

Bạn có thể chia sẻ mã đầy đủ của mình để tạo hai DataFrames mẫu không? Đối với những gì nó có giá trị, điều này có thể là do tối ưu hóa truy vấn SQL diễn ra trong đó các giai đoạn bản đồ "độc lập" có thể được sắp xếp lại. –

+0

Hamel, thực sự không có biến đổi hoặc hành động nào khác so với tôi đã đăng. Các khung dữ liệu được hiển thị là kết quả của df.show(). Bạn có thể dễ dàng tạo lại hành vi này, tạo một khung dữ liệu và thêm một cột ID hàng như trên, sau đó thêm một cột boolean ngẫu nhiên vào nó. Sau đó lọc trên cột đó và xem cách các ID hàng bạn nhận được từ tăng đơn điệu được "tái sử dụng" như tôi mô tả. – Kai

+0

@Kai Tôi thực sự sẽ thêm rằng cách đơn giản nhất để tái tạo nó là chỉ sử dụng một phân vùng duy nhất. – zero323

Trả lời

11

Spark 2,0

  • này được vấn đề đã được giải quyết trong Spark 2.0 với SPARK-14241.

  • Một vấn đề tương tự như đã được giải quyết trong Spark 2.1 với SPARK-14393 Spark 1.x Vấn đề

bạn kinh nghiệm là khá tinh tế nhưng có thể được giảm đến một thực tế đơn giản là một monotonically_increasing_id chức năng cực kỳ xấu xí. Rõ ràng là không thuần khiết và giá trị của nó phụ thuộc vào thứ gì đó hoàn toàn nằm ngoài tầm kiểm soát của bạn.

Nó không nhận bất kỳ tham số nào từ góc độ trình tối ưu hóa, không quan trọng khi được gọi và có thể được đẩy sau tất cả các thao tác khác. Do đó hành vi bạn thấy.

Nếu bạn xem mã, bạn sẽ tìm thấy mã này được đánh dấu rõ ràng bằng cách mở rộng biểu tượng MonotonicallyIncreasingID bằng Nondeterministic.

Tôi không nghĩ rằng có bất kỳ giải pháp thanh lịch nào nhưng cách bạn có thể xử lý việc này là thêm sự phụ thuộc nhân tạo vào giá trị được lọc.Ví dụ với một UDF như thế này:

from pyspark.sql.types import LongType 
from pyspark.sql.functions import udf 

bound = udf(lambda _, v: v, LongType()) 

(df 
    .withColumn("rn", monotonically_increasing_id()) 
    # Due to nondeterministic behavior it has to be a separate step 
    .withColumn("rn", bound("P", "rn")) 
    .where("P")) 

Nói chung nó có thể là sạch hơn để thêm các chỉ số sử dụng zipWithIndex trên RDD và sau đó chuyển đổi nó trở lại một DataFrame.


* Giải pháp được hiển thị ở trên không còn là giải pháp hợp lệ (cũng không bắt buộc) trong Spark 2.x khi Python UDF tuân theo tối ưu hóa kế hoạch thực hiện.

3

Tôi không thể tạo lại điều này. Tôi đang sử dụng Spark 2.0 mặc dù vậy có thể hành vi đã thay đổi hoặc tôi không làm điều tương tự như bạn.

val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true)) 
.toDF("name", "value","flag") 
.withColumn("rowd", monotonically_increasing_id()) 

df.show 

val df2 = df.filter(col("flag")=== true) 

df2.show 

df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields] 
+-----+-----+-----+----+ 
| name|value| flag|rowd| 
+-----+-----+-----+----+ 
| one| 1| true| 0| 
| two| 2|false| 1| 
|three| 3| true| 2| 
| four| 4| true| 3| 
+-----+-----+-----+----+ 
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields] 
+-----+-----+----+----+ 
| name|value|flag|rowd| 
+-----+-----+----+----+ 
| one| 1|true| 0| 
|three| 3|true| 2| 
| four| 4|true| 3| 
+-----+-----+----+----+ 
+0

Tôi không tìm thấy bất kỳ vấn đề nào với mã trên – thebluephantom

+0

tương đương với gì ** monotonically_increasing_id() ** trong gói java – Yugerten

+0

org.apache.spark.sql.functions có sẵn trong tia lửa Java API https: //. apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#monotonicallyIncreasingId-- – Davos

1

Để có được xung quanh đánh giá chuyển đổi của monotonically_increasing_id(), bạn có thể thử viết dataframe vào đĩa và đọc lại. Sau đó, cột id bây giờ chỉ đơn giản là một trường dữ liệu đang được đọc, thay vì được tính toán động tại một số điểm trong đường ống. Mặc dù nó là một giải pháp khá xấu xí, nó đã hoạt động khi tôi làm một bài kiểm tra nhanh.

1

Điều này phù hợp với tôi. Đã tạo một cột nhận dạng khác và sử dụng chức năng cửa sổ row_number

import org.apache.spark.sql.functions.{row_number} 
import org.apache.spark.sql.expressions.Window 

val df1: DataFrame = df.withColumn("Id",lit(1)) 

df1 
.select(
..., 
row_number() 
.over(Window 
.partitionBy("Id" 
.orderBy(col("...").desc)) 
) 
.alias("Row_Nbr") 
) 
Các vấn đề liên quan