Tôi bị nhầm lẫn là tại sao có vẻ như Spark đang sử dụng 1 tác vụ cho rdd.mapPartitions
khi chuyển đổi RDD kết quả thành một DataFrame.pyspark sử dụng một nhiệm vụ cho mapPartitions khi chuyển đổi rdd thành dataframe
Đây là một vấn đề đối với tôi bởi vì tôi muốn đi từ:
DataFrame
->RDD
->rdd.mapPartitions
->DataFrame
để tôi có thể đọc dữ liệu (DataFrame), áp dụng một hàm không SQL cho các khối dữ liệu (mapPartitions trên RDD) và sau đó chuyển đổi trở lại một DataFrame để tôi có thể sử dụng quy trình DataFrame.write
.
Tôi có thể đi từ DataFrame -> mapPartitions và sau đó sử dụng một nhà văn RDD như saveAsTextFile nhưng đó là ít hơn lý tưởng vì quá trình DataFrame.write
có thể làm những việc như ghi đè và lưu dữ liệu ở định dạng Orc. Vì vậy, tôi muốn tìm hiểu lý do tại sao điều này đang xảy ra, nhưng từ quan điểm thực tế, tôi chủ yếu quan tâm đến việc có thể chỉ cần đi từ một DataFrame -> mapParitions -> để sử dụng quá trình DataFrame.write.
Đây là ví dụ có thể tái sản xuất. Các công trình sau đây như mong đợi, với 100 nhiệm vụ cho công tác mapPartitions
:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession \
.builder \
.master("yarn-client") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]})
spark_df = spark.createDataFrame(df).repartition(100)
def f(part):
return [(1,2)]
spark_df.rdd.mapPartitions(f).collect()
Tuy nhiên, nếu dòng cuối cùng là thay đổi một cái gì đó giống như spark_df.rdd.mapPartitions(f).toDF().show()
sau đó sẽ chỉ có một nhiệm vụ cho công tác mapPartitions
.
Một số ảnh chụp màn hình minh họa dưới đây:
Các tương tự xảy ra khi gọi 'DataFrame.write' vào kết quả là tốt. – David
Bạn đang chờ công việc của mình hoàn thành? Khi tôi làm 'toDF(). Collect()', tôi thấy một giai đoạn 'runJob' với một nhiệm vụ nữa, được khởi xướng bởi' toDF' để kiểm tra lược đồ của khung dữ liệu kết quả, theo sau là một giai đoạn 'thu thập' với dự kiến 100 nhiệm vụ. – sgvd
'collect()' không phải là một điều khả thi đối với tôi trong cuộc sống thực vì kết quả cuối cùng là vài trăm GB dữ liệu. Công việc thất bại khi chạy 'DataFrame.write' chỉ với 1 tác vụ nhưng thành công khi chạy' saveAsText'. Tôi sẽ chỉnh sửa các ví dụ từ việc thu thập và hiển thị để lưu dữ liệu vì có thể có sự khác biệt giữa các dữ liệu đó. – David