2016-04-01 16 views
10

ví dụlàm thế nào để lặp qua mỗi hàng của dataFrame trong pyspark

sqlContext = SQLContext(sc) 

sample=sqlContext.sql("select Name ,age ,city from user") 
sample.show() 

Những tuyên bố trên in toàn bộ bảng trên thiết bị đầu cuối, nhưng tôi muốn truy cập mỗi hàng trong bảng đó sử dụng for hoặc while để thực hiện tính toán thêm.

+0

Tôi tin rằng tôi đã cung cấp câu trả lời đúng. Bạn có thể chọn hoặc cung cấp phản hồi để cải thiện không? – aaronsteers

Trả lời

13

Bạn chỉ đơn giản là không thể. DataFrames, giống như các cấu trúc dữ liệu được phân phối khác, không phải là iterable và có thể được truy cập bằng cách chỉ sử dụng các hàm SQL và/hoặc hàm thứ tự chuyên dụng cao hơn.

Bạn có thể dĩ nhiên collect hoặc chuyển đổi toLocalIterator và lặp cục bộ

for row in df.rdd.collect(): 
    do_something(row) 

nhưng nó đập tất cả các mục đích sử dụng Spark.

2

Nếu bạn muốn làm điều gì đó cho mỗi hàng trong đối tượng DataFrame, hãy sử dụng map. Điều này sẽ cho phép bạn thực hiện các phép tính tiếp theo trên mỗi hàng. Nó tương đương với vòng lặp trên toàn bộ tập dữ liệu từ 0 đến len(dataset)-1.

Lưu ý rằng điều này sẽ trả về PipelinedRDD, không phải là một DataFrame.

21

Bạn sẽ xác định chức năng tùy chỉnh và sử dụng bản đồ.

def customFunction(row): 

    return (row.name, row.age, row.city) 

sample2 = sample.rdd.map(customFunction) 

hoặc

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city)) 

Các chức năng tùy chỉnh sau đó sẽ được áp dụng cho tất cả các hàng của dataframe. Lưu ý rằng mẫu 2 sẽ là RDD, không phải là một khung dữ liệu.

Bản đồ là cần thiết nếu bạn định thực hiện các phép tính phức tạp hơn. Nếu bạn chỉ cần thêm cột có nguồn gốc, bạn có thể sử dụng withColumn, trả về một khung dữ liệu.

sample3 = sample.withColumn('age2', sample.age + 2) 
2

Sử dụng comprehensions danh sách trong python, bạn có thể thu thập toàn bộ cột của giá trị vào một danh sách chỉ sử dụng hai dòng:

df = sqlContext.sql("show tables in default") 
tableList = [x["tableName"] for x in df.rdd.collect()] 

Trong ví dụ trên, chúng tôi trả lại một danh sách các bảng trong cơ sở dữ liệu ' mặc định ', nhưng điều tương tự có thể được điều chỉnh bằng cách thay thế truy vấn được sử dụng trong sql().

Hoặc hơn viết tắt:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()] 

Và ví dụ bạn của ba cột, chúng ta có thể tạo ra một danh sách các từ điển, và sau đó lặp qua chúng trong một vòng lặp for.

sql_text = "select name, age, city from user" 
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
      for x in sqlContext.sql(sql_text).rdd.collect()] 
for row in tupleList: 
    print("{} is a {} year old from {}".format(
     row["name"], 
     row["age"], 
     row["city"])) 
0

trên

tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 

nên

tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]} 

cho name, age, và city không biến mà đơn giản là chìa khóa của từ điển.

Các vấn đề liên quan