19

Tôi đang cố gắng tìm ra cách tốt nhất để có được giá trị lớn nhất trong cột dữ liệu Spark.Cách tốt nhất để lấy giá trị tối đa trong cột dữ liệu Spark

Hãy xem xét ví dụ sau:

df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) 
df.show() 

nào tạo:

+---+---+ 
| A| B| 
+---+---+ 
|1.0|4.0| 
|2.0|5.0| 
|3.0|6.0| 
+---+---+ 

Mục tiêu của tôi là để tìm giá trị lớn nhất trong cột A (qua sự kiểm tra, đây là 3.0). Sử dụng PySpark, đây là bốn cách tiếp cận tôi có thể nghĩ:

# Method 1: Use describe() 
float(df.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A']) 

# Method 2: Use SQL 
df.registerTempTable("df_table") 
spark.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval'] 

# Method 3: Use groupby() 
df.groupby().max('A').collect()[0].asDict()['max(A)'] 

# Method 4: Convert to RDD 
df.select("A").rdd.max()[0] 

Mỗi phòng trong số trên cho câu trả lời đúng, nhưng trong sự vắng mặt của một công cụ Spark profiling Tôi không thể nói đó là tốt nhất.

Bất kỳ ý tưởng nào từ trực giác hoặc kinh nghiệm về phương pháp nào ở trên là hiệu quả nhất về thời gian chạy Spark hoặc sử dụng tài nguyên hoặc liệu có phương pháp trực tiếp hơn các phương pháp trên không?

+5

Phương pháp 2 và 3 tương đương và sử dụng các kế hoạch lôgic vật lý và tối ưu hóa giống nhau. Phương pháp 4 áp dụng giảm với tối đa trên rdd. Nó có thể chậm hơn hoạt động trực tiếp trên DataFrame. Phương pháp 1 ít nhiều tương đương với 2 và 3. – zero323

+1

@ zero323 Còn về 'df.select (max (" A ")) thì thu thập() [0] .asDict() ['max (A)']'? Có vẻ tương đương với Phương pháp 2 trong khi nhỏ gọn hơn và cũng trực quan hơn với Phương pháp 3. – desertnaut

+0

- Cách chậm nhất là phương pháp 4, vì bạn thực hiện chuyển đổi DF sang RDD của toàn bộ cột và sau đó trích xuất giá trị lớn nhất; –

Trả lời

15
>df1.show() 
+-----+--------------------+--------+----------+-----------+ 
|floor|   timestamp|  uid|   x|   y| 
+-----+--------------------+--------+----------+-----------+ 
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418| 
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393| 
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585| 
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073| 

>row1 = df1.agg({"x": "max"}).collect()[0] 
>print row1 
Row(max(x)=110.33613) 
>print row1["max(x)"] 
110.33613 

Câu trả lời gần giống như phương pháp3. nhưng dường như các "asDict()" trong method3 thể được gỡ bỏ

+0

ai đó có thể giải thích tại sao thu thập() [0] là cần thiết? – jibiel

+2

@jibiel 'collect()' trả về một danh sách (trong trường hợp này với một mục duy nhất), vì vậy bạn cần truy cập vào mục đầu tiên (chỉ) trong danh sách –

2

Trong trường hợp một số thắc mắc làm thế nào để làm điều đó bằng Scala, here you go (sử dụng Spark 2.0 +.):

scala> df.createOrReplaceTempView("TEMP_DF") 
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF"). 
    collect()(0).getInt(0) 
scala> print(myMax) 
117 
6

Max giá trị cho một cột cụ thể của một dataframe có thể đạt được bằng cách sử dụng -

your_max_value = df.agg({"your-column": "max"}).collect()[0][0]

0

Ghi chú: Spark được thiết kế để làm việc trên Big Data - tính toán phân tán. Kích thước của ví dụ DataFrame là rất nhỏ, vì vậy thứ tự của các ví dụ thực tế có thể được thay đổi liên quan đến ví dụ nhỏ ~.

Chậm nhất: Method_1, vì .describe ("A") tính toán min, max, trung bình, stddev, và đếm (5 phép tính trên toàn bộ cột)

Medium: Method_4, bởi vì, .rdd (DF để Chuyển đổi RDD) làm chậm quá trình.

Nhanh hơn: Method_3 ~ Method_2 ~ method_5, vì logic rất giống nhau, vì vậy trình tối ưu hóa chất xúc tác của Spark tuân theo logic rất giống với số thao tác tối thiểu (tối đa một cột cụ thể, thu thập một khung dữ liệu giá trị); (.asDict() cho biết thêm một chút thêm thời gian so sánh 3,2 đến 5)

import pandas as pd 
import time 

time_dict = {} 

dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) 
#-- For bigger/realistic dataframe just uncomment the following 3 lines 
#lst = list(np.random.normal(0.0, 100.0, 100000)) 
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst}) 
#dfff = self.sqlContext.createDataFrame(pdf) 

tic1 = int(round(time.time() * 1000)) 
# Method 1: Use describe() 
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A']) 
tac1 = int(round(time.time() * 1000)) 
time_dict['m1']= tac1 - tic1 
print (max_val) 

tic2 = int(round(time.time() * 1000)) 
# Method 2: Use SQL 
dfff.registerTempTable("df_table") 
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval'] 
tac2 = int(round(time.time() * 1000)) 
time_dict['m2']= tac2 - tic2 
print (max_val) 

tic3 = int(round(time.time() * 1000)) 
# Method 3: Use groupby() 
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)'] 
tac3 = int(round(time.time() * 1000)) 
time_dict['m3']= tac3 - tic3 
print (max_val) 

tic4 = int(round(time.time() * 1000)) 
# Method 4: Convert to RDD 
max_val = dfff.select("A").rdd.max()[0] 
tac4 = int(round(time.time() * 1000)) 
time_dict['m4']= tac4 - tic4 
print (max_val) 

tic5 = int(round(time.time() * 1000)) 
# Method 4: Convert to RDD 
max_val = dfff.agg({"A": "max"}).collect()[0][0] 
tac5 = int(round(time.time() * 1000)) 
time_dict['m5']= tac5 - tic5 
print (max_val) 

print time_dict 

Kết quả trên một cạnh-node của một cụm trong mili giây (ms):

nhỏ DF (mili giây): {'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}

lớn hơn DF (ms): {'m1': 10260, 'm2 ': 452,' m3 ': 465,' m4 ': 916,' m5 ': 373}

Các vấn đề liên quan