2015-09-23 14 views
12

Tôi có một RDD với một tuple của giá trị (String, SparseVector) và tôi muốn tạo một DataFrame sử dụng RDD. Để có được một (label: string, features: vector) DataFrame là Schema được yêu cầu bởi hầu hết các thư viện của thuật toán ml. Tôi biết điều đó có thể được thực hiện bởi vì HashingTF Thư viện ml xuất ra một véc tơ khi được cung cấp cột tính năng của DataFrame.Làm thế nào để chuyển đổi một RDD với một cột SparseVector đến một DataFrame với một cột như Vector

temp_df = sqlContext.createDataFrame(temp_rdd, StructType([ 
     StructField("label", DoubleType(), False), 
     StructField("tokens", ArrayType(StringType()), False) 
    ])) 

#assumming there is an RDD (double,array(strings)) 

hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features") 

ndf = hashingTF.transform(temp_df) 
ndf.printSchema() 

#outputs 
#root 
#|-- label: double (nullable = false) 
#|-- tokens: array (nullable = false) 
#| |-- element: string (containsNull = true) 
#|-- features: vector (nullable = true) 

Vì vậy, câu hỏi của tôi là, tôi bằng cách nào đó có thể có một RDD của (String, SparseVector) chuyển nó sang một DataFrame của (String, vector). Tôi đã thử với thông số sqlContext.createDataFrame nhưng không có DataType phù hợp với nhu cầu của tôi.

df = sqlContext.createDataFrame(rdd,StructType([ 
     StructField("label" , StringType(),True), 
     StructField("features" , ?Type(),True) 
    ])) 

Trả lời

17

Bạn phải sử dụng VectorUDT đây:

# In Spark 1.x 
# from pyspark.mllib.linalg import SparseVector, VectorUDT 
from pyspark.ml.linalg import SparseVector, VectorUDT 

temp_rdd = sc.parallelize([ 
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})), 
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))]) 

schema = StructType([ 
    StructField("label", DoubleType(), True), 
    StructField("features", VectorUDT(), True) 
]) 

temp_rdd.toDF(schema).printSchema() 

## root 
## |-- label: double (nullable = true) 
## |-- features: vector (nullable = true) 

Chỉ cần cho đầy đủ Scala tương đương:

import org.apache.spark.sql.Row 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.types.{DoubleType, StructType} 
// In Spark 1x. 
// import org.apache.spark.mllib.linalg.{Vectors, VectorUDT} 
import org.apache.spark.ml.linalg.Vectors 
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType 

val schema = new StructType() 
    .add("label", DoubleType) 
    // In Spark 1.x 
    //.add("features", new VectorUDT()) 
    .add("features",VectorType) 

val temp_rdd: RDD[Row] = sc.parallelize(Seq(
    Row(0.0, Vectors.sparse(4, Seq((1, 1.0), (3, 5.5)))), 
    Row(1.0, Vectors.sparse(4, Seq((0, -1.0), (2, 0.5)))) 
)) 

spark.createDataFrame(temp_rdd, schema).printSchema 

// root 
// |-- label: double (nullable = true) 
// |-- features: vector (nullable = true) 
+2

Wow, tôi đã tìm kiếm điều này trong thời đại! gần như khóc hạnh phúc:,) +1 –

+1

Điều này đã hiệu quả! Cảm ơn nhiều! bạn có thể cho tôi biết trong tài liệu đó ở đâu không? không thể tìm thấy bất kỳ VectorUDT trên linalg apache spark Docs –

+0

@OrangelMarquez có thể yêu cầu kéo là bắt buộc –

4

Trong khi @ zero323 trả lời https://stackoverflow.com/a/32745924/1333621 có ý nghĩa, và tôi muốn nó làm việc cho tôi - rdd nằm bên dưới khung dữ liệu, sqlContext.createDataFrame (temp_rdd, schema), các loại SparseVectors vẫn còn chứa tôi phải làm những điều sau đây để chuyển đổi sang loại DenseVector - nếu ai đó có một cách ngắn hơn/tốt hơn tôi muốn biết

temp_rdd = sc.parallelize([ 
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})), 
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))]) 

schema = StructType([ 
    StructField("label", DoubleType(), True), 
    StructField("features", VectorUDT(), True) 
]) 

temp_rdd.toDF(schema).printSchema() 
df_w_ftr = temp_rdd.toDF(schema) 

print 'original convertion method: ',df_w_ftr.take(5) 
print('\n') 
temp_rdd_dense = temp_rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray()))) 
print type(temp_rdd_dense), type(temp_rdd) 
print 'using map and toArray:', temp_rdd_dense.take(5) 

temp_rdd_dense.toDF().show() 

root 
|-- label: double (nullable = true) 
|-- features: vector (nullable = true) 

original convertion method: [Row(label=0.0, features=SparseVector(4, {1: 1.0, 3: 5.5})), Row(label=1.0, features=SparseVector(4, {0: -1.0, 2: 0.5}))] 


<class 'pyspark.rdd.PipelinedRDD'> <class 'pyspark.rdd.RDD'> 
using map and toArray: [Row(features=DenseVector([0.0, 1.0, 0.0, 5.5]), label=0.0), Row(features=DenseVector([-1.0, 0.0, 0.5, 0.0]), label=1.0)] 

+------------------+-----+ 
|   features|label| 
+------------------+-----+ 
| [0.0,1.0,0.0,5.5]| 0.0| 
|[-1.0,0.0,0.5,0.0]| 1.0| 
+------------------+-----+ 
1

đây là một ví dụ trong scala cho spark 2,1

import org.apache.spark.ml.linalg.Vector 

def featuresRDD2DataFrame(features: RDD[Vector]): DataFrame = { 
    import sparkSession.implicits._ 
    val rdd: RDD[(Double, Vector)] = features.map(x => (0.0, x)) 
    val df = rdd.toDF("label","features").select("features") 
    df 
    } 

các toDF() đã không được công nhận bởi trình biên dịch trên các tính năng rdd

Các vấn đề liên quan