2017-01-05 17 views
7

Ví dụ (Python) sẽ làm cho câu hỏi của tôi rõ ràng. Hãy nói rằng tôi có một dataframe Spark lượng người đã xem phim nhất định vào những ngày nhất định, như sau:Tích lũy mảng từ các hàng trước đó (khung dữ liệu PySpark)

movierecord = spark.createDataFrame([("Alice", 1, ["Avatar"]),("Bob", 2, ["Fargo", "Tron"]),("Alice", 4, ["Babe"]), ("Alice", 6, ["Avatar", "Airplane"]), ("Alice", 7, ["Pulp Fiction"]), ("Bob", 9, ["Star Wars"])],["name","unixdate","movies"]) 

Giản đồ và dataframe xác định bởi giao diện trên như sau:

root 
|-- name: string (nullable = true) 
|-- unixdate: long (nullable = true) 
|-- movies: array (nullable = true) 
| |-- element: string (containsNull = true) 

+-----+--------+------------------+ 
|name |unixdate|movies   | 
+-----+--------+------------------+ 
|Alice|1  |[Avatar]   | 
|Bob |2  |[Fargo, Tron]  | 
|Alice|4  |[Babe]   | 
|Alice|6  |[Avatar, Airplane]| 
|Alice|7  |[Pulp Fiction] | 
|Bob |9  |[Star Wars]  | 
+-----+--------+------------------+ 

tôi thích đi từ trên để tạo cột dữ liệu mới chứa tất cả trước đây phim được xem bởi mỗi người dùng, không trùng lặp ("trước đó" trên trường không trùng lặp). Vì vậy, nó sẽ trông như thế này:

+-----+--------+------------------+------------------------+ 
|name |unixdate|movies   |previous_movies   | 
+-----+--------+------------------+------------------------+ 
|Alice|1  |[Avatar]   |[]      | 
|Bob |2  |[Fargo, Tron]  |[]      | 
|Alice|4  |[Babe]   |[Avatar]    | 
|Alice|6  |[Avatar, Airplane]|[Avatar, Babe]   | 
|Alice|7  |[Pulp Fiction] |[Avatar, Babe, Airplane]| 
|Bob |9  |[Star Wars]  |[Fargo, Tron]   | 
+-----+--------+------------------+------------------------+ 

Làm thế nào để thực hiện điều này một cách hiệu quả?

Trả lời

5

chỉmà không giữ gìn trật tự của các đối tượng SQL:

  • bắt buộc nhập khẩu:

    import pyspark.sql.functions as f 
    from pyspark.sql.window import Window 
    
  • nét Window:

    w = Window.partitionBy("name").orderBy("unixdate") 
    
  • Hoàn thành giải pháp:

    (movierecord 
        # Flatten movies 
        .withColumn("previous_movie", f.explode("movies")) 
        # Collect unique 
        .withColumn("previous_movies", f.collect_set("previous_movie").over(w)) 
        # Drop duplicates for a single unixdate 
        .groupBy("name", "unixdate") 
        .agg(f.max(f.struct(
         f.size("previous_movies"), 
         f.col("movies").alias("movies"), 
         f.col("previous_movies").alias("previous_movies") 
        )).alias("tmp")) 
        # Shift by one and extract 
        .select(
         "name", "unixdate", "tmp.movies", 
         f.lag("tmp.previous_movies", 1).over(w).alias("previous_movies"))) 
    
  • Kết quả:

    +-----+--------+------------------+------------------------+ 
    |name |unixdate|movies   |previous_movies   | 
    +-----+--------+------------------+------------------------+ 
    |Bob |2  |[Fargo, Tron]  |null     | 
    |Bob |9  |[Star Wars]  |[Fargo, Tron]   | 
    |Alice|1  |[Avatar]   |null     | 
    |Alice|4  |[Babe]   |[Avatar]    | 
    |Alice|6  |[Avatar, Airplane]|[Babe, Avatar]   | 
    |Alice|7  |[Pulp Fiction] |[Babe, Airplane, Avatar]| 
    +-----+--------+------------------+------------------------+ 
    

SQL một Python UDF giữ gìn trật tự:

  • Nhập khẩu:

    import pyspark.sql.functions as f 
    from pyspark.sql.window import Window 
    from pyspark.sql import Column 
    from pyspark.sql.types import ArrayType, StringType 
    
    from typing import List, Union 
    
    # https://github.com/pytoolz/toolz 
    from toolz import unique, concat, compose 
    
  • UDF:

    def flatten_distinct(col: Union[Column, str]) -> Column: 
        def flatten_distinct_(xss: Union[List[List[str]], None]) -> List[str]: 
         return compose(list, unique, concat)(xss or []) 
        return f.udf(flatten_distinct_, ArrayType(StringType()))(col) 
    
  • Window định nghĩa như trước.

  • Hoàn thành giải pháp:

    (movierecord 
        # Collect lists 
        .withColumn("previous_movies", f.collect_list("movies").over(w)) 
        # Flatten and drop duplicates 
        .withColumn("previous_movies", flatten_distinct("previous_movies")) 
        # Shift by one 
        .withColumn("previous_movies", f.lag("previous_movies", 1).over(w)) 
        # For presentation only 
        .orderBy("unixdate")) 
    
  • Kết quả:

    +-----+--------+------------------+------------------------+ 
    |name |unixdate|movies   |previous_movies   | 
    +-----+--------+------------------+------------------------+ 
    |Alice|1  |[Avatar]   |null     | 
    |Bob |2  |[Fargo, Tron]  |null     | 
    |Alice|4  |[Babe]   |[Avatar]    | 
    |Alice|6  |[Avatar, Airplane]|[Avatar, Babe]   | 
    |Alice|7  |[Pulp Fiction] |[Avatar, Babe, Airplane]| 
    |Bob |9  |[Star Wars]  |[Fargo, Tron]   | 
    +-----+--------+------------------+------------------------+ 
    

Performance:

Tôi tin rằng không có cách nào hiệu quả để giải quyết việc này cho những hạn chế .Không chỉ yêu cầu đầu ra yêu cầu sao chép dữ liệu đáng kể (dữ liệu được mã hóa nhị phân để phù hợp với định dạng Tungsten, do đó bạn có thể nén được nhưng nhận dạng đối tượng lỏng lẻo), nhưng cũng có một số hoạt động đắt tiền.

Điều này sẽ ổn nếu kích thước mong đợi của previous_movies bị giới hạn và nhỏ nhưng sẽ không khả thi nói chung.

Sao chép dữ liệu khá dễ dàng để giải quyết bằng cách giữ lịch sử đơn lẻ, lười biếng cho người dùng. Không phải cái gì có thể được thực hiện trong SQL nhưng khá dễ dàng với các hoạt động RDD mức thấp.

Phát nổ và collect_ mẫu đắt tiền. Nếu yêu cầu của bạn là nghiêm ngặt nhưng bạn muốn cải thiện hiệu suất, bạn có thể sử dụng Scala UDF thay cho Python một.

Các vấn đề liên quan