2016-04-28 16 views
8

Tôi là một ứng dụng tia lửa với một số điểm mà tôi muốn duy trì trạng thái hiện tại. Điều này thường là sau một bước lớn, hoặc bộ nhớ đệm một trạng thái mà tôi muốn sử dụng nhiều lần. Có vẻ như khi tôi gọi bộ nhớ cache trên khung dữ liệu của tôi lần thứ hai, một bản sao mới được lưu vào bộ nhớ cache. Trong ứng dụng của tôi, điều này dẫn đến các vấn đề về bộ nhớ khi mở rộng quy mô. Mặc dù, một khung dữ liệu nhất định là tối đa khoảng 100 MB trong các thử nghiệm hiện tại của tôi, kích thước tích lũy của các kết quả trung gian phát triển vượt ra ngoài bộ nhớ được phân bổ trên trình thực thi. Xem dưới đây cho một ví dụ nhỏ cho thấy hành vi này.Un-persististing tất cả các dataframes trong (py) spark

cache_test.py:

from pyspark import SparkContext, HiveContext 

spark_context = SparkContext(appName='cache_test') 
hive_context = HiveContext(spark_context) 

df = (hive_context.read 
     .format('com.databricks.spark.csv') 
     .load('simple_data.csv') 
    ) 
df.cache() 
df.show() 

df = df.withColumn('C1+C2', df['C1'] + df['C2']) 
df.cache() 
df.show() 

spark_context.stop() 

simple_data.csv:

1,2,3 
4,5,6 
7,8,9 

Nhìn vào giao diện người dùng ứng dụng, có một bản sao của dataframe gốc, trong adition một với cột mới . Tôi có thể xóa bản gốc bằng cách gọi df.unpersist() trước dòng withColumn. Đây có phải là cách được đề xuất để xóa kết quả trung gian được lưu trong bộ nhớ cache (tức là không cần phải gọi trước mỗi cache()).

Ngoài ra, bạn có thể xóa tất cả các đối tượng được lưu trong bộ nhớ cache. Trong ứng dụng của tôi, có những điểm ngắt tự nhiên, nơi tôi có thể đơn giản xóa tất cả bộ nhớ và chuyển sang tệp tiếp theo. Tôi muốn làm điều này mà không cần tạo một ứng dụng tia lửa mới cho mỗi tệp đầu vào.

Cảm ơn bạn trước!

Trả lời

11

Spark 2.x

Bạn có thể sử dụng Catalog.clearCache:

from pyspark.sql import SparkSession 

spark = SparkSession.builder.getOrCreate 
... 
spark.catalog.clearCache() 

Spark 1.x

Bạn có thể sử dụng phương pháp SQLContext.clearCache

Loại bỏ tất cả các bảng được lưu trong bộ nhớ cache từ bộ nhớ cache trong bộ nhớ.

from pyspark.sql import SQLContext 
from pyspark import SparkContext 

sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate()) 
... 
sqlContext.clearCache() 
+1

Đây là một giải pháp tốt cho bây giờ vì nó cho phép tôi để xóa toàn bộ bộ nhớ cache tại điểm ngắt hợp lý. Tôi sẽ kết hợp điều này, nhưng tôi lo lắng khi tôi mở rộng quy mô và bắt đầu làm việc với các bộ dữ liệu lớn hơn, bộ nhớ cache cũ của tôi bắt đầu phát triển ngoài tầm kiểm soát. Nếu tôi muốn xóa bộ đệm cũ khi tôi đi, là đề xuất tạo một biến mới (hoặc biến tạm thời), và loại bỏ các đối tượng cũ một cách rõ ràng. Một cái gì đó như: 'df.cache()'; 'df_new = df.withColumn ('C1 + C2', df ['C1'] + df ['C2'])'; 'df_new.cache()'; 'df.unpersist()'. Điều này có vẻ hơi cồng kềnh nếu đó là cách duy nhất ... – bjack3

+0

Thông thường không cần xóa bộ nhớ cache rõ ràng. Nó được làm sạch tự động khi cần thiết. – zero323

+0

Tôi lo rằng tôi đang làm gì đó sai. Trong ứng dụng đầy đủ của tôi, công việc của tôi cuối cùng sẽ sụp đổ do lỗi ngoài bộ nhớ.Mỗi bản sao riêng lẻ của một khung dữ liệu là hợp lý nhỏ (dưới 100 MB), nhưng bộ nhớ cache dường như tồn tại mãi mãi; ngay cả sau khi viết đầu ra cho một tập tin, và chuyển sang các bước tiếp theo. Tôi sẽ xem nếu tôi có thể tạo ra một ví dụ làm việc nhỏ hơn để hiển thị điều này trong hành động. – bjack3

1

Chúng tôi sử dụng này khá thường xuyên

for (id, rdd) in sc._jsc.getPersistentRDDs().items(): 
    rdd.unpersist() 
0

thể riêng unpersist tất cả df của:

firstDF.unpersist() 
secondDF.unpersist() 
Các vấn đề liên quan