2014-09-02 15 views
6

Để cung cấp nhiều bối cảnh nhất có thể/tôi cần, tôi đang cố gắng lấy một số dữ liệu được lưu trữ trên máy chủ postgres từ xa (heroku) vào DataFrame gấu trúc, sử dụng psycopg2 để kết nối.Kéo số lượng lớn dữ liệu từ máy chủ từ xa vào DataFrame

Tôi quan tâm đến hai bảng cụ thể, người dùngsự kiện, và kết nối hoạt động tốt, bởi vì khi kéo xuống dữ liệu người dùng

import pandas.io.sql as sql 
# [...] 
users = sql.read_sql("SELECT * FROM users", conn) 

sau khi chờ đợi một vài giây, DataFrame là trở lại như mong đợi.

<class 'pandas.core.frame.DataFrame'> 
Int64Index: 67458 entries, 0 to 67457 
Data columns (total 35 columns): [...] 

Tuy nhiên, khi cố gắng kéo lớn hơn, nặng hơn sự kiện dữ liệu trực tiếp từ ipython, sau một thời gian dài, nó chỉ bị treo:

In [11]: events = sql.read_sql("SELECT * FROM events", conn) 
[email protected]:~$ 

và khi cố gắng từ một máy tính xách tay ipython tôi nhận được Hạt nhân chết lỗi

Hạt nhân đã chết, bạn có muốn khởi động lại không? Nếu bạn không khởi động lại hạt nhân, bạn sẽ có thể lưu sổ ghi chép, nhưng mã chạy sẽ không hoạt động cho đến khi sổ ghi chép được mở lại.


Update # 1:

Để có được một ý tưởng tốt hơn về kích thước của sự kiện bảng Tôi đang cố gắng để kéo trong, đây là số lượng hồ sơ và số các thuộc tính cho mỗi:

In [11]: sql.read_sql("SELECT count(*) FROM events", conn) 
Out[11]: 
    count 
0 2711453 

In [12]: len(sql.read_sql("SELECT * FROM events LIMIT 1", conn).columns) 
Out[12]: 18 

Cập nhật # 2:

Memory chắc chắn là một nút cổ chai cho việc thực hiện hiện tại của read_sql: khi kéo xuống sự kiện và cố gắng để chạy một thể hiện của ipython kết quả là

[email protected]:~$ sudo ipython 
-bash: fork: Cannot allocate memory 

Cập nhật # 3:

Lần đầu tiên tôi thực hiện với triển khai read_sql_chunked sẽ chỉ r eturn mảng của một phần DataFrames:

def read_sql_chunked(query, conn, nrows, chunksize=1000): 
    start = 0 
    dfs = [] 
    while start < nrows: 
     df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), conn) 
     start += chunksize 
     dfs.append(df) 
     print "Events added: %s to %s of %s" % (start-chunksize, start, nrows) 
    # print "concatenating dfs" 
    return dfs 

event_dfs = read_sql_chunked("SELECT * FROM events", conn, events_count, 100000) 

và hoạt động tốt, nhưng khi cố ghép nối các DataFrames, hạt nhân lại chết.
Và đây là sau khi cung cấp cho máy ảo 2GB RAM.

Dựa trên lời giải thích của Andy về read_sql so vớiread_csv sự khác biệt trong việc thực hiện và hiệu suất, điều tiếp theo tôi đã cố gắng là để thêm các hồ sơ vào một CSV và sau đó đọc tất cả chúng vào một DataFrame:

event_dfs[0].to_csv(path+'new_events.csv', encoding='utf-8') 

for df in event_dfs[1:]: 
    df.to_csv(path+'new_events.csv', mode='a', header=False, encoding='utf-8') 

Một lần nữa, các văn bản cho CSV hoàn tất thành công - một tập tin 657MB - nhưng việc đọc từ CSV không bao giờ hoàn thành.

Làm cách nào để có thể ước tính số lượng RAM đủ để đọc cho biết tệp CSV 657MB, vì 2GB dường như chưa đủ?


Cảm thấy như tôi đang thiếu một số hiểu biết cơ bản của một trong hai DataFrames hoặc psycopg2, nhưng tôi gặp khó khăn, tôi thậm chí không thể xác định được nút cổ chai hoặc nơi để tối ưu hóa.

Chiến lược thích hợp để lấy lượng dữ liệu lớn hơn từ máy chủ từ xa (postgres) là gì?

+0

Là một kinh nghiệm này sucks Hy vọng chúng tôi có thể có được điều này làm việc cho bạn trong! Sự tò mò về mức độ lớn của bảng của bạn là bao nhiêu hàng? –

+0

@AndyHayden được cập nhật để thêm vào số lượng bản ghi và số lượng thuộc tính cho mỗi bảng trong bảng _events_ –

+0

Bạn có * cần * tất cả dữ liệu tại một lần trong bộ nhớ? Hoặc là nó đủ để chỉ có một phần của dữ liệu (ví dụ như cột nhất định) cùng một lúc trong một DataFrame? (nhưng ngoài ra, câu hỏi của bạn về một dataframe lớn như thế nào có thể là hợp pháp tất nhiên) – joris

Trả lời

4

Tôi nghi ngờ có một vài (liên quan) điều tại chơi ở đây gây ra sự chậm chạp:

  1. read_sql được viết bằng python vì vậy nó một chút chậm (đặc biệt là so với read_csv, được viết bằng cython - và cẩn thận thực hiện cho tốc độ!) và nó dựa trên sqlalchemy hơn là một số (có khả năng nhanh hơn) C-DBAPI. Động lực để chuyển sang sqlalchmey là làm cho việc di chuyển đó dễ dàng hơn trong tương lai (cũng như hỗ trợ đa nền tảng).
  2. Bạn có thể chạy ra khỏi bộ nhớ khi quá nhiều đối tượng python là trong bộ nhớ (điều này có liên quan đến không sử dụng C-DBAPI), nhưng khả năng có thể được giải quyết ...

Tôi nghĩ rằng ngay lập tức giải pháp là một cách tiếp cận dựa trên chunk (và có một feature request để có tác phẩm này tự nhiên trong gấu trúc read_sqlread_sql_table).

CHỈNH SỬA: Như của Pandas v0.16.2 phương pháp tiếp cận dựa trên đoạn này được thực hiện nguyên bản trong read_sql.


Vì bạn đang sử dụng postgres, bạn có quyền truy cập vào LIMIT and OFFSET queries, điều này làm cho việc chia nhỏ trở nên dễ dàng. (Tôi có ngay trong suy nghĩ đây không phải là có sẵn trong tất cả các ngôn ngữ sql?)

Đầu tiên, có được số hàng (hoặc một estimate) trong bảng của bạn:

nrows = con.execute('SELECT count(*) FROM users').fetchone()[0] # also works with an sqlalchemy engine 

Sử dụng này để lặp qua bảng (để gỡ lỗi bạn có thể thêm một số báo cáo in để xác nhận rằng nó đã làm việc/không bị rơi!) và sau đó kết hợp các kết quả:

def read_sql_chunked(query, con, nrows, chunksize=1000): 
    start = 1 
    dfs = [] # Note: could probably make this neater with a generator/for loop 
    while start < nrows: 
     df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), con) 
     dfs.append(df) 
    return pd.concat(dfs, ignore_index=True) 

Lưu ý: đây giả định rằng các cơ sở dữ liệu phù hợp trong bộ nhớ! Nếu không, bạn sẽ cần phải làm việc trên từng đoạn (phong cách mapreduce) ... hoặc đầu tư vào nhiều bộ nhớ hơn!

+0

Bộ nhớ có thể rất là nút cổ chai: Tôi đang chạy một máy ảo chỉ có 512M mặc định. Nhanh chóng chạm tới 1024M và nếu điều đó không hiệu quả, tôi sẽ thử đọc đoạn trích đó. –

+0

@MariusButuc cho tôi biết hội chợ giải pháp này như thế nào nếu bạn có bất kỳ vấn đề gì! –

+0

đã thêm _Update # 3_ với các nỗ lực mới (vẫn kém thành công) của tôi. –

0

cố gắng sử dụng gấu trúc:

* mysql_cn = mysql.connector.connect (host = 'localhost', port = 123, user = 'xyz', passwd = '****', db = 'xy_db') **

dữ liệu = pd.read_sql ('SELECT * FROM table;', con = mysql_cn

mysql_cn.close()

nó làm việc cho tôi

Các vấn đề liên quan