2016-12-20 32 views
7

Điều gì là thích hợp và cách nhanh nhất để đọc dữ liệu Cassandra vào gấu trúc? Bây giờ tôi sử dụng đoạn mã sau nhưng nó rất chậm ...Python đọc dữ liệu Cassandra vào gấu trúc

import pandas as pd 

from cassandra.cluster import Cluster 
from cassandra.auth import PlainTextAuthProvider 
from cassandra.query import dict_factory 

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS) 
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT, 
    auth_provider=auth_provider) 

session = cluster.connect(CASSANDRA_DB) 
session.row_factory = dict_factory 

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE) 

df = pd.DataFrame() 

for row in session.execute(sql_query): 
    df = df.append(pd.DataFrame(row, index=[0])) 

df = df.reset_index(drop=True).fillna(pd.np.nan) 

Reading 1000 hàng mất 1 phút, và tôi có "hơn chút" một ... Nếu tôi chạy ví dụ cùng một truy vấn. trong DBeaver, tôi nhận được toàn bộ kết quả (~ 40k hàng) trong vòng một phút.

Cảm ơn bạn !!!

+0

Nếu đầu ra của 'session.execute (sql_query)' là danh sách các dicts, tôi muốn thử chỉ 'df = pd.DataFrame (session.execute (sql_query))' hoặc chạy 'pd.DataFrame' trên một số phần của danh sách này. Việc thêm các hàng vào một khung dữ liệu từng cái một là không hiệu quả. – ptrj

+0

Kết quả của 'session.execute (sql_query)' là một đối tượng đặc biệt '' iterable. Các hàng của nó có thể là các bộ dữ liệu, tên_tập_tập hoặc từ điển. – ragesz

+0

Tôi hiểu. Tuy nhiên, tốt hơn hết nên chuyển nó thành danh sách đầu tiên, ví dụ 'lst = []; cho hàng trong phiên ...: lst.append (hàng) 'nếu không có gì khác hoạt động. Và sau đó ghép các kết quả: 'df = pd.concat (lst)'. Bằng cách này, bạn có thể tránh các cuộc gọi 40k tốn kém đến 'pd.DataFrame.append'. – ptrj

Trả lời

13

tôi có câu trả lời ở chính thức mailing list (nó hoạt động hoàn hảo):

Hi,

cố gắng để xác định riêng nhà máy gấu trúc hàng của bạn:

def pandas_factory(colnames, rows): 
    return pd.DataFrame(rows, columns=colnames) 

session.row_factory = pandas_factory 
session.default_fetch_size = None 

query = "SELECT ..." 
rslt = session.execute(query, timeout=None) 
df = rslt._current_rows 

Đó là cách tôi làm điều đó - nó phải nhanh hơn ...

Nếu bạn tìm thấy phương pháp nhanh hơn - tôi quan tâm đến :)

Michael

3

Những gì tôi làm (trong python 3) là:

query = "SELECT ..." 
df = pd.DataFrame(list(session.execute(query))) 
Các vấn đề liên quan