2016-02-05 28 views
18

Tôi đang cố sử dụng các khung dữ liệu Spark thay vì RDD vì chúng dường như cao cấp hơn RDD và có xu hướng tạo mã dễ đọc hơn, nhưng tôi sẽ rất vui khi nhận được đề xuất cho một cái gì đó thành ngữ hơn cho nhiệm vụ trong tầm tay.Tìm hàng tối đa cho mỗi nhóm trong Spark DataFrame

Trong cụm Google Dataproc 14 nút, tôi có khoảng 6 triệu tên được dịch sang id theo hai hệ thống khác nhau: sasb. Mỗi chứa name, id_said_sb. Mục tiêu của tôi là tạo bản đồ từ id_sa đến id_sb sao cho mỗi id_sa, id_sb tương ứng là id thường xuyên nhất trong số tất cả các tên được đính kèm với id_sa.

Hãy cố gắng làm rõ bằng ví dụ. Nếu tôi có các hàng sau:

[Row(name='n1', id_sa='a1', id_sb='b1'), 
Row(name='n2', id_sa='a1', id_sb='b2'), 
Row(name='n3', id_sa='a1', id_sb='b2'), 
Row(name='n4', id_sa='a2', id_sb='b2')] 

Mục tiêu của tôi là để tạo ra một ánh xạ từ a1 để b2. Thật vậy, những cái tên liên quan đến a1n1, n2n3, mà bản đồ tương ứng để b1, b2b2, vì vậy b2 được ánh xạ thường gặp nhất trong các tên liên quan đến a1. Trong cùng một cách, a2 sẽ được ánh xạ tới b2. Có thể giả định rằng sẽ luôn có người chiến thắng: không cần phải phá vỡ quan hệ.

Tôi đã hy vọng rằng tôi có thể sử dụng groupBy(df.id_sa) trên khung dữ liệu của mình, nhưng tôi không biết phải làm gì tiếp theo. Tôi đã hy vọng cho một sự kết hợp có thể tạo ra, cuối cùng, các hàng sau:

[Row(id_sa=a1, max_id_sb=b2), 
Row(id_sa=a2, max_id_sb=b2)] 

Nhưng có lẽ tôi đang cố gắng để sử dụng công cụ sai và tôi chỉ cần quay trở lại sử dụng RDDs.

+1

Câu hỏi của bạn là gì? – eliasah

+0

@eliasah bất kỳ con trỏ (liên kết, mẫu mã) về cách để làm điều này với dataframes? –

+1

Bạn muốn thực hiện một nhóm theo một tập hợp tối đa? – eliasah

Trả lời

18

Sử dụng join (nó sẽ gây ra nhiều hơn một hàng trong nhóm trong trường hợp quan hệ):

import pyspark.sql.functions as F 
from pyspark.sql.functions import count, col 

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts") 
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs") 

cnts.join(maxs, 
    (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa")) 
).select(col("cnts.id_sa"), col("cnts.id_sb")) 

Sử dụng chức năng cửa sổ (sẽ thả các mối quan hệ):

from pyspark.sql.functions import rowNumber 
from pyspark.sql.window import Window 

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc()) 

(cnts 
    .withColumn("rn", rowNumber().over(w)) 
    .where(col("rn") == 1) 
    .select("id_sa", "id_sb")) 

Sử dụng struct đặt hàng:

from pyspark.sql.functions import struct 

(cnts 
    .groupBy("id_sa") 
    .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max")) 
    .select(col("id_sa"), col("max.id_sb"))) 

Xem thêm SPARK DataFrame: select the first row of each group

+1

@QuentinPradet Tôi đã cung cấp một giải pháp khác không được kiểm tra rất tốt nhưng có thể là hiệu suất thú vị-khôn ngoan – zero323

+0

Cảm ơn! Tôi nhận được cùng một hiệu suất với phiên bản chức năng cửa sổ và cấu trúc đặt hàng một (giữa 15s và 20s), có lẽ tôi sẽ thấy sự khác biệt khi tôi sẽ chuyển sang một tập dữ liệu lớn hơn. Ngoài ra, tôi hy vọng bạn không nhớ, nhưng tôi đã thay đổi tối đa để F.max kể từ đó vấp tôi lên. Vui lòng quay lại. –

+1

Tôi không bận tâm chút nào. Nếu bạn thấy bất kỳ lỗi nào hoặc bạn nghĩ rằng có một cách tốt hơn để thể hiện một cái gì đó cảm thấy tự do để chỉnh sửa bất kỳ bài viết của tôi :) Và tôi đồng ý - 'max' mix-up có thể gây nhầm lẫn. Phải mất một lúc tôi mới hiểu được chuyện gì đang xảy ra khi tôi gặp phải điều này lần đầu tiên. – zero323

2

Tôi nghĩ rằng những gì bạn có thể tìm kiếm được các chức năng cửa sổ: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Dưới đây là một ví dụ trong Scala (Tôi không có một Spark Shell với Hive có sẵn ngay bây giờ, vì vậy tôi đã không thể kiểm tra mã, nhưng tôi nghĩ rằng nó sẽ hoạt động):

case class MyRow(name: String, id_sa: String, id_sb: String) 

val myDF = sc.parallelize(Array(
    MyRow("n1", "a1", "b1"), 
    MyRow("n2", "a1", "b2"), 
    MyRow("n3", "a1", "b2"), 
    MyRow("n1", "a2", "b2") 
)).toDF("name", "id_sa", "id_sb") 

import org.apache.spark.sql.expressions.Window 

val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc) 

myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb") 

Có thể có nhiều cách hiệu quả hơn để đạt được kết quả tương tự với chức năng Cửa sổ, nhưng tôi hy vọng điều này chỉ cho bạn ở bên phải phương hướng.

Các vấn đề liên quan