2013-04-27 52 views
51

Tôi có một lượng lớn dữ liệu trong một bộ sưu tập trong mongodb mà tôi cần phân tích. Làm cách nào để nhập dữ liệu đó vào gấu trúc?Làm cách nào để nhập dữ liệu từ mongodb sang gấu trúc?

Tôi mới làm quen với gấu trúc và vón cục.

EDIT: Bộ sưu tập mongodb chứa các giá trị cảm biến được gắn thẻ ngày và giờ. Các giá trị cảm biến là kiểu dữ liệu float.

mẫu dữ liệu:

{ 
"_cls" : "SensorReport", 
"_id" : ObjectId("515a963b78f6a035d9fa531b"), 
"_types" : [ 
    "SensorReport" 
], 
"Readings" : [ 
    { 
     "a" : 0.958069536790466, 
     "_types" : [ 
      "Reading" 
     ], 
     "ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"), 
     "b" : 6.296118156595, 
     "_cls" : "Reading" 
    }, 
    { 
     "a" : 0.95574014778624, 
     "_types" : [ 
      "Reading" 
     ], 
     "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"), 
     "b" : 6.29651468650064, 
     "_cls" : "Reading" 
    }, 
    { 
     "a" : 0.953648289182713, 
     "_types" : [ 
      "Reading" 
     ], 
     "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"), 
     "b" : 7.29679823731148, 
     "_cls" : "Reading" 
    }, 
    { 
     "a" : 0.955931884300997, 
     "_types" : [ 
      "Reading" 
     ], 
     "ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"), 
     "b" : 6.29642922525632, 
     "_cls" : "Reading" 
    }, 
    { 
     "a" : 0.95821381, 
     "_types" : [ 
      "Reading" 
     ], 
     "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"), 
     "b" : 7.28956613, 
     "_cls" : "Reading" 
    }, 
    { 
     "a" : 4.95821335, 
     "_types" : [ 
      "Reading" 
     ], 
     "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"), 
     "b" : 6.28956574, 
     "_cls" : "Reading" 
    }, 
    { 
     "a" : 9.95821341, 
     "_types" : [ 
      "Reading" 
     ], 
     "ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"), 
     "b" : 0.28956488, 
     "_cls" : "Reading" 
    }, 
    { 
     "a" : 1.95667927, 
     "_types" : [ 
      "Reading" 
     ], 
     "ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"), 
     "b" : 0.29115237, 
     "_cls" : "Reading" 
    } 
], 
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"), 
"sensorName" : "56847890-0", 
"reportCount" : 8 
} 
+0

Sử dụng [một loại lĩnh vực tùy chỉnh] (https://gist.github.com/jdthorpe/93145e8093258a3b73b2bd458533176d) với MongoEngine có thể làm cho việc lưu trữ và lấy Pandas DataFrames đơn giản như 'mongo_doc.data_frame = my_pandas_df' – Jthorpe

Trả lời

74

pymongo có thể cung cấp cho bạn một bàn tay, Dưới đây là một số mã Tôi đang sử dụng :

import pandas as pd 
from pymongo import MongoClient 


def _connect_mongo(host, port, username, password, db): 
    """ A util for making a connection to mongo """ 

    if username and password: 
     mongo_uri = 'mongodb://%s:%[email protected]%s:%s/%s' % (username, password, host, port, db) 
     conn = MongoClient(mongo_uri) 
    else: 
     conn = MongoClient(host, port) 


    return conn[db] 


def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True): 
    """ Read from Mongo and Store into DataFrame """ 

    # Connect to MongoDB 
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db) 

    # Make a query to the specific DB and Collection 
    cursor = db[collection].find(query) 

    # Expand the cursor and construct the DataFrame 
    df = pd.DataFrame(list(cursor)) 

    # Delete the _id 
    if no_id: 
     del df['_id'] 

    return df 
+0

Cảm ơn, đây là phương pháp tôi đã sử dụng. Tôi cũng có một loạt các tài liệu nhúng trong mỗi hàng. Vì vậy, tôi đã phải lặp lại điều đó cũng như trong mỗi hàng. Có cách nào tốt hơn để làm điều này?? – Nithin

+0

Có thể cung cấp một số mẫu cấu trúc mongodb của bạn không? – waitingkuo

+0

Xem xét chỉnh sửa cho hàng dữ liệu mẫu. Một mảng của tài liệu nhúng "Đọc" được lưu trữ bên trong các bài đọc. Bây giờ tôi đang thực hiện một truy vấn để tìm nhiều bản ghi và sau đó lặp qua từng lần đọc trong mảng đọc cho mỗi bản ghi. Có cách nào dễ dàng hơn để nhập dữ liệu trong trường hợp của tôi không? – Nithin

16

Monary thực hiện chính xác điều đó và nó là siêu nhanh. (another link)

Xem this cool post bao gồm hướng dẫn nhanh và một số thời gian.

+0

Monary có hỗ trợ kiểu dữ liệu chuỗi không? –

+0

Tôi đã thử Monary, nhưng phải mất rất nhiều thời gian. Tôi có thiếu một số tối ưu hóa không? Cố gắng 'client = Monary (host, 27.017, cơ sở dữ liệu = "db_tmp") \t \t cột = [ "col1", "col2"] \t \t data_type = [ "Int64", "Int64"] \t \t mảng = client .query ("db_tmp", "coll", {}, columns, data_type) ' Đối với bản ghi' 50000' mất khoảng 200s'. –

+0

Nghe có vẻ rất chậm ... Thật tình, tôi không biết tình trạng của dự án này là gì, bây giờ, 4 năm sau ... – shx2

14

Bạn có thể tải dữ liệu mongodb của mình vào gấu trúc DataFrame sử dụng mã này. Nó làm việc cho tôi. Hy vọng cho bạn quá.

import pymongo 
import pandas as pd 
from pymongo import MongoClient 
client = MongoClient() 
db = client.database_name 
collection = db.collection_name 
data = pd.DataFrame(list(collection.find())) 
1

Sử dụng

pandas.DataFrame(list(...)) 

sẽ tiêu tốn rất nhiều bộ nhớ nếu kết quả iterator/máy phát điện là lớn

tốt hơn để tạo ra khối nhỏ và concat vào cuối

def iterator2dataframes(iterator, chunk_size: int): 
    """Turn an iterator into multiple small pandas.DataFrame 

    This is a balance between memory and efficiency 
    """ 
    records = [] 
    frames = [] 
    for i, record in enumerate(iterator): 
    records.append(record) 
    if i % chunk_size == chunk_size - 1: 
     frames.append(pd.DataFrame(records)) 
     records = [] 
    if records: 
    frames.append(pd.DataFrame(records)) 
    return pd.concat(frames) 
3

Để xử lý dữ liệu ngoài lõi (không phù hợp với RAM) một cách hiệu quả (nghĩa là với việc thực thi song song), bạn có thể thử Python Blaze ecosystem: Blaze/Dask/Odo.

Blaze (và Odo) có các chức năng ngoài hộp để xử lý MongoDB.

Một vài điều hữu ích để bắt đầu:

Và một bài báo trong đó cho thấy những điều tuyệt vời là có thể với Blaze stack: Analyzing 1.7 Billion Reddit Comments with Blaze and Impala (về cơ bản, truy vấn 975 Gb nhận xét Reddit tính bằng giây).

P.S. Tôi không liên kết với bất kỳ công nghệ nào trong số này.

+1

Tôi cũng đã viết một [post] (https://github.com/ goldan/data-science/blob/master/notebook/dask-performance.ipynb) bằng cách sử dụng Jupyter Notebook với ví dụ cách Dask giúp tăng tốc thực thi ngay cả khi dữ liệu được lắp vào bộ nhớ bằng cách sử dụng nhiều lõi trên một máy. –

7
import pandas as pd 
from odo import odo 

data = odo('mongodb://localhost/db::collection', pd.DataFrame) 
6

Theo PEP, đơn giản là tốt hơn so với phức tạp:

import pandas as pd 
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find()) 

Bạn có thể bao gồm các điều kiện như bạn sẽ làm việc với cơ sở dữ liệu MongoDB thường xuyên hoặc thậm chí sử dụng find_one() để có được chỉ có một phần tử từ cơ sở dữ liệu , v.v.

và thì đấy!

0

Thực hiện theo câu trả lời tuyệt vời này theo số waitingkuo Tôi muốn thêm khả năng thực hiện việc đó bằng cách sử dụng chunksize phù hợp với .read_sql().read_csv(). Tôi mở rộng câu trả lời từ Deu Leung bằng cách tránh từng người một từng 'bản ghi' của 'trình lặp'/'con trỏ'. Tôi sẽ mượn chức năng read_mongo trước đó.

def read_mongo(db, 
      collection, query={}, 
      host='localhost', port=27017, 
      username=None, password=None, 
      chunksize = 100, no_id=True): 
""" Read from Mongo and Store into DataFrame """ 


# Connect to MongoDB 
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db) 
client = MongoClient(host=host, port=port) 
# Make a query to the specific DB and Collection 
db_aux = client[db] 


# Some variables to create the chunks 
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize)) 
if len(skips_variable)<=1: 
    skips_variable = [0,len(skips_variable)] 

# Iteration to create the dataframe in chunks. 
for i in range(1,len(skips_variable)): 

    # Expand the cursor and construct the DataFrame 
    #df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]])) 
    df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]])) 

    if no_id: 
     del df_aux['_id'] 

    # Concatenate the chunks into a unique df 
    if 'df' not in locals(): 
     df = df_aux 
    else: 
     df = pd.concat([df, df_aux], ignore_index=True) 

return df 
Các vấn đề liên quan