2015-08-03 30 views
9

Tôi có nhật ký người dùng mà tôi đã lấy từ csv và được chuyển đổi thành DataFrame để tận dụng các tính năng truy vấn SparkSQL. Một người dùng sẽ tạo nhiều mục nhập mỗi giờ và tôi muốn thu thập một số thông tin thống kê cơ bản cho mỗi người dùng; thực sự chỉ là số lượng các trường hợp người dùng, mức trung bình và độ lệch chuẩn của nhiều cột. Tôi đã có thể nhanh chóng có được giá trị trung bình và đếm thông tin bằng cách sử dụng groupby ($ "user") và aggregator với các chức năng SparkSQL cho đếm và trung bình:Tính toán độ lệch chuẩn của dữ liệu được nhóm trong Spark DataFrame

val meanData = selectedData.groupBy($"user").agg(count($"logOn"), 
avg($"transaction"), avg($"submit"), avg($"submitsPerHour"), avg($"replies"), 
avg($"repliesPerHour"), avg($"duration")) 

Tuy nhiên, tôi dường như không thể tìm thấy một cách bình đẳng thanh lịch để tính toán độ lệch chuẩn. Cho đến nay tôi chỉ có thể tính toán nó bằng cách ánh xạ một chuỗi, cặp đôi và sử dụng StatCounter() tiện ích độ lệch chuẩn:.

val stdevduration = duration.groupByKey().mapValues(value => 
org.apache.spark.util.StatCounter(value).stdev) 

này trả về một RDD tuy nhiên, và tôi muốn thử và giữ cho nó tất cả trong một DataFrame cho các truy vấn khác có thể thực hiện được trên dữ liệu trả về.

Trả lời

29

Spark 1.6+

Bạn có thể sử dụng stddev_pop để tính toán độ lệch chuẩn dân số và stddev/stddev_samp để tính không thiên vị tiêu chuẩn mẫu lệch:

import org.apache.spark.sql.functions.{stddev_samp, stddev_pop} 

selectedData.groupBy($"user").agg(stdev_pop($"duration")) 

Spark 1.5 và dưới (Bản gốc answer):

Không phải như vậy xinh đẹp và thiên vị (giống như giá trị trả về từ describe) nhưng sử dụng công thức:

wikipedia sdev

bạn có thể làm một cái gì đó như thế này:

import org.apache.spark.sql.functions.sqrt 

selectedData 
    .groupBy($"user") 
    .agg((sqrt(
     avg($"duration" * $"duration") - 
     avg($"duration") * avg($"duration") 
    )).alias("duration_sd")) 

Bạn có thể tất nhiên tạo ra một chức năng để giảm sự lộn xộn:

import org.apache.spark.sql.Column 
def mySd(col: Column): Column = { 
    sqrt(avg(col * col) - avg(col) * avg(col)) 
} 

df.groupBy($"user").agg(mySd($"duration").alias("duration_sd")) 

Nó cũng có thể sử dụng Hive UDF:

df.registerTempTable("df") 
sqlContext.sql("""SELECT user, stddev(duration) 
        FROM df 
        GROUP BY user""") 

Nguồn ảnh: https://en.wikipedia.org/wiki/Standard_deviation

+2

trình Perfect! Cảm ơn bạn rất nhiều vì câu trả lời tuyệt vời và để thêm vào lời gọi hàm bí danh. – the3rdNotch

+0

typo siêu nhỏ: stdev -> stddev – Jesse

+0

Có ba hàm trong hàm SparkSQL: stddev, stddev_samp, stddev_pop. Vậy có cần phải thực hiện tùy chỉnh nữa không? –

Các vấn đề liên quan