2015-11-26 21 views
8

Tôi đang cố tạo một hàm tổng hợp do người dùng định nghĩa (UDAF) trong Java bằng cách sử dụng Apache Spark SQL trả về nhiều mảng khi hoàn thành. Tôi đã tìm kiếm trực tuyến và không thể tìm thấy bất kỳ ví dụ hoặc đề xuất nào về cách thực hiện việc này.Trả về nhiều mảng từ hàm tổng hợp do người dùng định nghĩa (UDAF) trong Apache Spark SQL

Tôi có thể trả về một mảng, nhưng không thể tìm ra cách lấy dữ liệu theo đúng định dạng trong phương thức evaluate() để trả về nhiều mảng.

UDAF hoạt động như tôi có thể in ra các mảng trong phương thức evaluate(), tôi không thể tìm ra cách trả về các mảng đó cho mã gọi (được hiển thị dưới đây để tham khảo).

UserDefinedAggregateFunction customUDAF = new CustomUDAF(); 
DataFrame resultingDataFrame = dataFrame.groupBy().agg(customUDAF.apply(dataFrame.col("long_col"), dataFrame.col("double_col"))).as("processed_data"); 

Tôi đã bao gồm toàn bộ lớp UDAF tùy chỉnh bên dưới, nhưng các phương thức chính là dataType() và phương thức đánh giá(), được hiển thị đầu tiên.

Bất kỳ trợ giúp hoặc lời khuyên nào sẽ được đánh giá cao. Cảm ơn bạn.

public class CustomUDAF extends UserDefinedAggregateFunction { 

    @Override 
    public DataType dataType() { 
     // TODO: Is this the correct way to return 2 arrays? 
     return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false)) 
      .add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false)); 
    } 

    @Override 
    public Object evaluate(Row buffer) { 
     // Data conversion 
     List<Long> longList = new ArrayList<Long>(buffer.getList(0)); 
     List<Double> dataList = new ArrayList<Double>(buffer.getList(1)); 

     // Processing of data (omitted) 

     // TODO: How to get data into format needed to return 2 arrays? 
     return dataList; 
    } 

    @Override 
    public StructType inputSchema() { 
     return new StructType().add("long", DataTypes.LongType).add("data", DataTypes.DoubleType); 
    } 

    @Override 
    public StructType bufferSchema() { 
     return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false)) 
      .add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false)); 
    } 

    @Override 
    public void initialize(MutableAggregationBuffer buffer) { 
     buffer.update(0, new ArrayList<Long>()); 
     buffer.update(1, new ArrayList<Double>()); 
    } 

    @Override 
    public void update(MutableAggregationBuffer buffer, Row row) { 
     ArrayList<Long> longList = new ArrayList<Long>(buffer.getList(0)); 
     longList.add(row.getLong(0)); 

     ArrayList<Double> dataList = new ArrayList<Double>(buffer.getList(1)); 
     dataList.add(row.getDouble(1)); 

     buffer.update(0, longList); 
     buffer.update(1, dataList); 
    } 

    @Override 
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) { 
     ArrayList<Long> longList = new ArrayList<Long>(buffer1.getList(0)); 
     longList.addAll(buffer2.getList(0)); 

     ArrayList<Double> dataList = new ArrayList<Double>(buffer1.getList(1)); 
     dataList.addAll(buffer2.getList(1)); 

     buffer1.update(0, longList); 
     buffer1.update(1, dataList); 
    } 

    @Override 
    public boolean deterministic() { 
     return true; 
    } 
} 

Cập nhật: Dựa trên câu trả lời bởi zero323 tôi đã có thể quay trở lại hai mảng sử dụng:

return new Tuple2<>(longArray, dataArray); 

Lấy dữ liệu trong số này đã được một chút của một cuộc đấu tranh nhưng liên quan đến giải cấu trúc các DataFrame vào danh sách Java và sau đó xây dựng nó trở lại một DataFrame.

Trả lời

5

Theo như tôi có thể nói trả về một bộ túp thì chỉ vừa đủ. Trong Scala:

import org.apache.spark.sql.expressions._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.udf 
import org.apache.spark.sql.{Row, Column} 

object DummyUDAF extends UserDefinedAggregateFunction { 
    def inputSchema = new StructType().add("x", StringType) 
    def bufferSchema = new StructType() 
    .add("buff", ArrayType(LongType)) 
    .add("buff2", ArrayType(DoubleType)) 
    def dataType = new StructType() 
    .add("xs", ArrayType(LongType)) 
    .add("ys", ArrayType(DoubleType)) 
    def deterministic = true 
    def initialize(buffer: MutableAggregationBuffer) = {} 
    def update(buffer: MutableAggregationBuffer, input: Row) = {} 
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {} 
    def evaluate(buffer: Row) = (Array(1L, 2L, 3L), Array(1.0, 2.0, 3.0)) 
} 

val df = sc.parallelize(Seq(("a", 1), ("b", 2))).toDF("k", "v") 
df.select(DummyUDAF($"k")).show(1, false) 

// +---------------------------------------------------+ 
// |(DummyUDAF$(k),mode=Complete,isDistinct=false)  | 
// +---------------------------------------------------+ 
// |[WrappedArray(1, 2, 3),WrappedArray(1.0, 2.0, 3.0)]| 
// +---------------------------------------------------+ 
+0

Tôi đang trả về một nhóm (thấp, trung bình, cao) như một sự tự tin giữa UDAF. Có cách nào để phát tuple này thành nhiều cột để thay vì có '| key | [1.0,1.5,2.0] |' Tôi nhận được '| key | 1.0 | 1.5 | 2.0 |' – TomTom101

+0

@ TomTom101 Nếu nó là một tuple (struct field) chọn đơn giản là đủ. – zero323

+0

Đáng ngạc nhiên không phải là lừa! Tôi đã trở lại bây giờ là một trường hợp lớp cho khả năng đọc tốt hơn (nên đã cố gắng mà trước khi gửi bài). cám ơn! – TomTom101

Các vấn đề liên quan