2016-11-22 21 views
14

Tôi bị nhầm lẫn là tại sao có vẻ như Spark đang sử dụng 1 tác vụ cho rdd.mapPartitions khi chuyển đổi RDD kết quả thành một DataFrame.pyspark sử dụng một nhiệm vụ cho mapPartitions khi chuyển đổi rdd thành dataframe

Đây là một vấn đề đối với tôi bởi vì tôi muốn đi từ:

DataFrame ->RDD ->rdd.mapPartitions ->DataFrame

để tôi có thể đọc dữ liệu (DataFrame), áp dụng một hàm không SQL cho các khối dữ liệu (mapPartitions trên RDD) và sau đó chuyển đổi trở lại một DataFrame để tôi có thể sử dụng quy trình DataFrame.write.

Tôi có thể đi từ DataFrame -> mapPartitions và sau đó sử dụng một nhà văn RDD như saveAsTextFile nhưng đó là ít hơn lý tưởng vì quá trình DataFrame.write có thể làm những việc như ghi đè và lưu dữ liệu ở định dạng Orc. Vì vậy, tôi muốn tìm hiểu lý do tại sao điều này đang xảy ra, nhưng từ quan điểm thực tế, tôi chủ yếu quan tâm đến việc có thể chỉ cần đi từ một DataFrame -> mapParitions -> để sử dụng quá trình DataFrame.write.

Đây là ví dụ có thể tái sản xuất. Các công trình sau đây như mong đợi, với 100 nhiệm vụ cho công tác mapPartitions:

from pyspark.sql import SparkSession 
import pandas as pd 

spark = SparkSession \ 
    .builder \ 
    .master("yarn-client") \ 
    .enableHiveSupport() \ 
    .getOrCreate() 

sc = spark.sparkContext 

df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]}) 
spark_df = spark.createDataFrame(df).repartition(100) 

def f(part): 
    return [(1,2)] 

spark_df.rdd.mapPartitions(f).collect() 

Tuy nhiên, nếu dòng cuối cùng là thay đổi một cái gì đó giống như spark_df.rdd.mapPartitions(f).toDF().show() sau đó sẽ chỉ có một nhiệm vụ cho công tác mapPartitions.

Một số ảnh chụp màn hình minh họa dưới đây: enter image description here enter image description here

Trả lời

5

DataFrame.show() chỉ cho thấy số đầu tiên của dãy dataframe của bạn, theo mặc định chỉ là người đầu tiên 20. Nếu con số đó là nhỏ hơn so với số lượng hàng trên mỗi phân vùng, Spark là lười biếng và chỉ đánh giá một phân vùng duy nhất, tương đương với một tác vụ duy nhất.

Bạn cũng có thể thực hiện collect trên một khung dữ liệu, để tính toán và thu thập tất cả phân vùng và xem lại 100 tác vụ.

Bạn sẽ vẫn thấy nhiệm vụ runJob trước tiên, được gọi là toDF để có thể xác định lược đồ của khung dữ liệu kết quả: nó cần xử lý một phân vùng để có thể xác định loại kết xuất của ánh xạ chức năng. Sau giai đoạn ban đầu, hành động thực tế như collect sẽ xảy ra trên tất cả các phần tử. Ví dụ, đối với tôi chạy đoạn mã của bạn với dòng cuối cùng thay thế bằng spark_df.rdd.mapPartitions(f).toDF().collect() kết quả trong các giai đoạn:

enter image description here

+0

Các tương tự xảy ra khi gọi 'DataFrame.write' vào kết quả là tốt. – David

+0

Bạn đang chờ công việc của mình hoàn thành? Khi tôi làm 'toDF(). Collect()', tôi thấy một giai đoạn 'runJob' với một nhiệm vụ nữa, được khởi xướng bởi' toDF' để kiểm tra lược đồ của khung dữ liệu kết quả, theo sau là một giai đoạn 'thu thập' với dự kiến 100 nhiệm vụ. – sgvd

+1

'collect()' không phải là một điều khả thi đối với tôi trong cuộc sống thực vì kết quả cuối cùng là vài trăm GB dữ liệu. Công việc thất bại khi chạy 'DataFrame.write' chỉ với 1 tác vụ nhưng thành công khi chạy' saveAsText'. Tôi sẽ chỉnh sửa các ví dụ từ việc thu thập và hiển thị để lưu dữ liệu vì có thể có sự khác biệt giữa các dữ liệu đó. – David

Các vấn đề liên quan