2015-06-29 25 views
9

Bắt đầu với một DataFrame Spark để tạo ma trận véc tơ để xử lý phân tích thêm.Lặp lại thông qua Spark RDD

feature_matrix_vectors = feature_matrix1.map(lambda x: Vectors.dense(x)).cache() 
feature_matrix_vectors.first() 

Đầu ra là một mảng vectơ. Một số trong những vector có một null trong họ

>>> DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]) 
... 
>>> DenseVector([1.0, 1231.0, 15.0, 2008.0, null]) 

Từ này tôi muốn lặp thông qua ma trận vector và tạo ra một mảng LabeledPoint với 0 (zero) nếu vector chứa một null, nếu không có một 1.

def f(row): 
    if row.contain(None): 
     LabeledPoint(1.0,row) 
    else: 
     LabeledPoint(0.0,row) 

tôi đã cố gắng để lặp qua ma trận vector sử dụng

feature_matrix_labeledPoint = (f(row) for row in feature_matrix_vectors) # create a generator of row sums 
next(feature_matrix_labeledPoint) # Run the iteration protocol 

nhưng điều này không hoạt động.

TypeError: 'PipelinedRDD' object is not iterable 

Bất kỳ trợ giúp sẽ là tuyệt vời

+0

Câu trả lời SO này có các chi tiết http://stackoverflow.com/a/25296061/429476 –

Trả lời

7

RDDs không phải là thả để thay thế cho một danh sách Python. Bạn phải sử dụng một trong hai hành động hoặc phép biến đổi có sẵn trên RDD. Tại đây bạn chỉ cần sử dụng map:

from pyspark.mllib.linalg import DenseVector 
from pyspark.mllib.regression import LabeledPoint 


feature_matrix_vectors = sc.parallelize([ 
    DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]), 
    DenseVector([1.0, 1231.0, 15.0, 2008.0, None]) 
]) 

(feature_matrix_vectors 
    .map(lambda v: LabeledPoint(1.0 if None in v else 0.0, v)) 
    .collect()) 
Các vấn đề liên quan