2015-07-22 63 views
8

Tôi có một DataFrame trong Apache Spark với một mảng các số nguyên, nguồn là một tập hợp các hình ảnh. Cuối cùng tôi muốn làm PCA trên nó, nhưng tôi đang gặp khó khăn chỉ cần tạo ra một ma trận từ mảng của tôi. Làm cách nào để tạo ma trận từ RDD?Apache Spark: Cách tạo ma trận từ một DataFrame?

> imagerdd = traindf.map(lambda row: map(float, row.image)) 
> mat = DenseMatrix(numRows=206456, numCols=10, values=imagerdd) 
Traceback (most recent call last): 

    File "<ipython-input-21-6fdaa8cde069>", line 2, in <module> 
mat = DenseMatrix(numRows=206456, numCols=10, values=imagerdd) 

    File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 815, in __init__ 
values = self._convert_to_array(values, np.float64) 

    File  "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 806, in _convert_to_array 
    return np.asarray(array_like, dtype=dtype) 

    File "/usr/local/python/conda/lib/python2.7/site-  packages/numpy/core/numeric.py", line 462, in asarray 
    return array(a, dtype, copy=False, order=order) 

TypeError: float() argument must be a string or a number 

Tôi nhận được lỗi tương tự từ tất cả các thỏa thuận có thể tôi có thể nghĩ:

imagerdd = traindf.map(lambda row: Vectors.dense(row.image)) 
imagerdd = traindf.map(lambda row: row.image) 
imagerdd = traindf.map(lambda row: np.array(row.image)) 

Nếu tôi cố gắng

> imagedf = traindf.select("image") 
> mat = DenseMatrix(numRows=206456, numCols=10, values=imagedf) 

Traceback (cuộc gọi gần đây nhất cuối cùng):

File "<ipython-input-26-a8cbdad10291>", line 2, in <module> 
mat = DenseMatrix(numRows=206456, numCols=10, values=imagedf) 

    File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 815, in __init__ 
    values = self._convert_to_array(values, np.float64) 

    File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 806, in _convert_to_array 
    return np.asarray(array_like, dtype=dtype) 

    File "/usr/local/python/conda/lib/python2.7/site-packages/numpy/core/numeric.py", line 462, in asarray 
    return array(a, dtype, copy=False, order=order) 

ValueError: setting an array element with a sequence. 

Trả lời

7

Vì bạn không cung cấp ví dụ đầu vào Tôi sẽ giả sử nó trông nhiều hơn hoặc ít hơn như thế này, nơi id là một số hàng và image chứa các giá trị.

traindf = sqlContext.createDataFrame([ 
    (1, [1, 2, 3]), 
    (2, [4, 5, 6]), 
    (3, (7, 8, 9)) 
], ("id", "image")) 

Điều đầu tiên bạn phải hiểu là DenseMatrix là một cấu trúc dữ liệu địa phương. Để chính xác, đó là trình bao bọc xung quanh numpy.ndarray. Hiện tại (Spark 1.4.1) không có tương đương phân tán trong PySpark MLlib.

Ma trận dày đặc lấy ba đối số bắt buộc numRows, numCols, values trong đó values là cấu trúc dữ liệu cục bộ. Trong trường hợp của bạn, bạn phải thu thập đầu tiên:

values = (traindf. 
    rdd. 
    map(lambda r: (r.id, r.image)). # Extract row id and data 
    sortByKey(). # Sort by row id 
    flatMap(lambda (id, image): image). 
    collect()) 


ncol = len(traindf.rdd.map(lambda r: r.image).first()) 
nrow = traindf.count() 

dm = DenseMatrix(nrow, ncol, values) 

Cuối cùng:

> print dm.toArray() 
[[ 1. 4. 7.] 
[ 2. 5. 8.] 
[ 3. 6. 9.]] 

Sửa:

Trong Spark 1.5 + bạn có thể sử dụng mllib.linalg.distributed như sau:

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix 

mat = IndexedRowMatrix(traindf.map(lambda row: IndexedRow(*row))) 
mat.numRows() 
## 4 
mat.numCols() 
## 3 

mặc dù hiện tại API vẫn bị giới hạn trong b e hữu ích trong thực tế.

+0

Bạn có biết cách làm tương tự với scala không? https://stackoverflow.com/questions/47010126/calculate-cosine-similarity-spark-dataframe –

Các vấn đề liên quan