Nếu bạn thực sự cần phải sử dụng chức năng của bạn, tôi có thể đề nghị hai lựa chọn:
1) Sử dụng bản đồ/toDF:
import org.apache.spark.sql.Row
import sqlContext.implicits._
def getTimestamp: (String => java.sql.Timestamp) = // your function here
val test = myDF.select("my_column").rdd.map {
case Row(string_val: String) => (string_val, getTimestamp(string_val))
}.toDF("my_column", "new_column")
2) Sử dụng UDFs (UserDefinedFunction
):
import org.apache.spark.sql.functions._
def getTimestamp: (String => java.sql.Timestamp) = // your function here
val newCol = udf(getTimestamp).apply(col("my_column")) // creates the new column
val test = myDF.withColumn("new_column", newCol) // adds the new column to original DF
Có thêm chi tiết về Spark SQL UDFs trong this nice article by Bill Chambers.
Ngoài ra,
Nếu bạn chỉ muốn chuyển một cột StringType
vào một cột TimestampType
bạn có thể sử dụng unix_timestamp
column function sẵn từ Spark SQL 1.5:
val test = myDF
.withColumn("new_column", unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm").cast("timestamp"))
Lưu ý: Đối với spark 1.5.x, cần nhân kết quả của unix_timestamp
trước 1000
trước khi truyền tới dấu thời gian (iss ue SPARK-11724). Mã kết quả sẽ là:
val test = myDF
.withColumn("new_column", (unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm") *1000L).cast("timestamp"))
Edit: Thêm tùy chọn udf
Nguồn
2016-05-04 23:48:23
Thanks for the help. Vấn đề duy nhất tôi có là khi tôi làm df.withColumn ("start_date", unix_timestamp (df1 ("start_date"), "yyyy-MM-dd HH: mm: ss"). Cast ("dấu thời gian")) , ngày của tôi bị chuyển đổi sai. Ví dụ: 2013-08-12 06:40:54 được chuyển đổi thành 1970-01-16 22: 18: 09.654. Bạn có biết chuyện gì đang diễn ra không? – mt88
Đối với tia lửa 1.5 bạn phải nhân với 1000 trước khi cast –
cảm ơn! nó đã làm việc – mt88