2016-03-20 44 views
5

dataframes My chứa một lĩnh vực mà là một ngày và nó xuất hiện trong định dạng chuỗi, như ví dụPySpark: lọc một DataFrame bởi trường ngày trong phạm vi nơi ngày là chuỗi

'2015-07-02T11:22:21.050Z' 

tôi cần phải lọc DataFrame trên ngày chỉ nhận được các hồ sơ trong tuần trước. Vì vậy, tôi đã cố gắng một cách tiếp cận bản đồ nơi tôi chuyển ngày chuỗi các đối tượng datetime với strptime:

def map_to_datetime(row): 
    format_string = '%Y-%m-%dT%H:%M:%S.%fZ' 
    row.date = datetime.strptime(row.date, format_string) 

df = df.map(map_to_datetime) 

và sau đó tôi sẽ áp dụng một bộ lọc như

df.filter(lambda row: 
    row.date >= (datetime.today() - timedelta(days=7))) 

tôi quản lý để có được làm việc lập bản đồ nhưng bộ lọc không thành công với

TypeError: condition should be string or Column

Có cách nào để sử dụng bộ lọc theo cách hoạt động hoặc tôi nên thay đổi cách tiếp cận và cách thực hiện?

Trả lời

5

Bạn có thể giải quyết việc này mà không cần sử dụng mã Python phía người lao động và chuyển sang RDDs. Trước hết, vì bạn sử dụng chuỗi ISO 8601, dữ liệu của bạn có thể được truyền trực tiếp đến ngày hoặc dấu thời gian:

from pyspark.sql.functions import col 

df = sc.parallelize([ 
    ('2015-07-02T11:22:21.050Z',), 
    ('2016-03-20T21:00:00.000Z',) 
]).toDF(("d_str",)) 

df_casted = df.select("*", 
    col("d_str").cast("date").alias("dt"), 
    col("d_str").cast("timestamp").alias("ts")) 

Điều này sẽ tiết kiệm một vòng giữa JVM và Python. Ngoài ra còn có một vài cách bạn có thể tiếp cận phần thứ hai. chỉ ngày:

from pyspark.sql.functions import current_date, datediff, unix_timestamp 

df_casted.where(datediff(current_date(), col("dt")) < 7) 

Dấu thời gian:

def days(i: int) -> int: 
    return 60 * 60 * 24 * i 

df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7)) 

Bạn cũng có thể có một cái nhìn tại current_timestampdate_sub

Note: Tôi sẽ tránh sử dụng DataFrame.map. Thay vào đó, tốt hơn nên sử dụng DataFrame.rdd.map. Nó sẽ giúp bạn tiết kiệm một số công việc khi chuyển sang 2.0+

5

tôi đã tìm ra một cách để giải quyết vấn đề của tôi bằng cách sử dụng API SparkSQL với số ngày lưu giữ như chuỗi và thực hiện điều này:

last_week = (datetime.today() - timedelta(days=7)).strftime(format='%Y-%m-%d') 

new_df = df.where(df.date >= last_week) 
Các vấn đề liên quan