2015-09-24 20 views
12

Tôi có một số rất lớn pyspark.sql.dataframe.DataFrame có tên df. Tôi cần một số cách để liệt kê các bản ghi - do đó, có thể truy cập bản ghi với một số chỉ mục nhất định. (Hoặc chọn nhóm các hồ sơ với chỉ số dao động)PySpark DataFrames - cách liệt kê mà không cần chuyển đổi sang Pandas?

Trong gấu trúc, tôi có thể làm chỉ

indexes=[2,3,6,7] 
df[indexes] 

Ở đây tôi muốn một cái gì đó tương tự, (và không chuyển đổi dataframe để gấu trúc)

Gần nhất tôi có thể lấy đến là:

  • thống kê đầy đủ các đối tượng trong dataframe gốc bởi:

    indexes=np.arange(df.count()) 
    df_indexed=df.withColumn('index', indexes) 
    
    • Đang tìm kiếm các giá trị tôi cần sử dụng ở đâu) chức năng (.

CÂU HỎI:

  1. Tại sao nó không hoạt động và làm thế nào để làm cho nó làm việc? Làm thế nào để thêm một hàng vào một khung dữ liệu?
  2. Nó sẽ làm việc sau này để làm một cái gì đó như:

    indexes=[2,3,6,7] 
    df1.where("index in indexes").collect() 
    
  3. Bất kỳ nhanh hơn và cách đơn giản hơn để đối phó với nó?

Trả lời

11

Nó không làm việc vì:

  1. đối số thứ hai cho withColumn phải là một không Column một bộ sưu tập. np.array sẽ không làm việc ở đây
  2. khi bạn vượt qua "index in indexes" như một biểu thức SQL để whereindexes là ra khỏi phạm vi và nó không được giải quyết như một định danh hợp lệ

PySpark> = 1.4.0

Bạn có thể thêm số hàng bằng cách sử dụng chức năng cửa sổ tương ứng và truy vấn sử dụng phương pháp Column.isin hoặc chuỗi truy vấn được định dạng chính xác:

from pyspark.sql.functions import col, rowNumber 
from pyspark.sql.window import Window 

w = Window.orderBy() 
indexed = df.withColumn("index", rowNumber().over(w)) 

# Using DSL 
indexed.where(col("index").isin(set(indexes))) 

# Using SQL expression 
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes))) 

Dường như chức năng cửa sổ có tên mà không PARTITION BY khoản di chuyển tất cả dữ liệu vào phân vùng duy nhất để trên có thể không phải là giải pháp tốt nhất sau khi tất cả.

Any faster and simpler way to deal with it?

Không thực sự. Spark DataFrames không hỗ trợ truy cập hàng ngẫu nhiên.

PairedRDD có thể truy cập bằng phương pháp lookup tương đối nhanh nếu dữ liệu được phân đoạn bằng cách sử dụng HashPartitioner. Ngoài ra còn có dự án indexed-rdd hỗ trợ tra cứu hiệu quả.

Sửa:

độc lập của phiên bản PySpark bạn có thể thử một cái gì đó như thế này:

from pyspark.sql import Row 
from pyspark.sql.types import StructType, StructField, LongType 

row = Row("char") 
row_with_index = Row("char", "index") 

df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF() 
df.show(5) 

## +----+ 
## |char| 
## +----+ 
## | a| 
## | b| 
## | c| 
## | d| 
## | e| 
## +----+ 
## only showing top 5 rows 

# This part is not tested but should work and save some work later 
schema = StructType(
    df.schema.fields[:] + [StructField("index", LongType(), False)]) 

indexed = (df.rdd # Extract rdd 
    .zipWithIndex() # Add index 
    .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows 
    .toDF(schema)) # It will work without schema but will be more expensive 

# inSet in Spark < 1.3 
indexed.where(col("index").isin(indexes)) 
+0

Xin chào @ zero323, tôi đã thử đoạn trích. Mọi thứ hoạt động trừ 'indexed.where (col (" index "). InSet (indexes))' không hoạt động. Nó trả về đối tượng 'TypeError: 'Column' không thể gọi được cho tôi. Bạn có cập nhật về đoạn trích nếu tôi muốn truy vấn nhiều chỉ mục không? – titipata

7

Nếu bạn muốn có một dãy số đó là bảo đảm không va chạm nhưng không đòi hỏi một .over(partitionBy()) sau đó bạn có thể sử dụng monotonicallyIncreasingId().

from pyspark.sql.functions import monotonicallyIncreasingId 
df.select(monotonicallyIncreasingId().alias("rowId"),"*") 

Lưu ý rằng các giá trị không đặc biệt "gọn gàng". Mỗi phân vùng được đưa ra một phạm vi giá trị và đầu ra sẽ không được tiếp giáp. Ví dụ. 0, 1, 2, 8589934592, 8589934593, 8589934594.

này được đưa vào Spark trên 28 tháng 4 năm 2015 ở đây: https://github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2

1

Bạn chắc chắn có thể thêm một mảng để trích xuất một loạt các lựa chọn của bạn thực sự: Trong Scala, trước hết chúng ta cần tạo một chỉ mục mảng:

val index_array=(1 to df.count.toInt).toArray 

index_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 

Bây giờ bạn có thể nối cột này vào DF của bạn. Đầu tiên, với điều đó, bạn cần mở DF của chúng ta và lấy nó như một mảng, sau đó nén nó với index_array của bạn và sau đó chúng ta chuyển đổi mảng mới trở lại và RDD. Bước cuối cùng là lấy nó làm DF:

final_df = sc.parallelize((df.collect.map(
    x=>(x(0),x(1))) zip index_array).map(
    x=>(x._1._1.toString,x._1._2.toString,x._2))). 
    toDF("column_name") 

Việc lập chỉ mục sẽ rõ ràng hơn sau đó.

+0

Đây là một cách khá đơn giản, thực tế nhưng rất hay để thực hiện :-) – Steve

Các vấn đề liên quan