2012-07-30 20 views
87

Tôi thường cần áp dụng một hàm cho các nhóm rất lớn DataFrame (của các kiểu dữ liệu hỗn hợp) và muốn tận dụng nhiều lõi.Áp dụng hiệu quả chức năng cho một khung hình dữ liệu được ghép theo song song

Tôi có thể tạo trình lặp từ nhóm và sử dụng mô-đun đa xử lý, nhưng không hiệu quả vì mỗi nhóm và kết quả của hàm phải được chọn để gửi tin nhắn giữa các quá trình.

Có cách nào để tránh tẩy hoặc thậm chí tránh việc sao chép hoàn toàn DataFrame không? Có vẻ như các chức năng bộ nhớ dùng chung của các mô-đun đa xử lý được giới hạn ở các mảng numpy. Có sự lựa chọn nào khác không?

+0

Theo như tôi biết, không có cách nào để chia sẻ các đối tượng tùy ý. Tôi tự hỏi, nếu việc tẩy rửa mất nhiều thời gian hơn nhiều so với việc đạt được thông qua xử lý đa. Có lẽ bạn nên tìm kiếm một khả năng để tạo ra các gói công việc lớn hơn cho mỗi quy trình để giảm thời gian tẩy tương đối. Một khả năng khác là sử dụng đa xử lý khi bạn tạo nhóm. –

+3

Tôi làm một cái gì đó như thế nhưng sử dụng UWSGI, Flask và preforking: Tôi tải khung dữ liệu gấu trúc vào một quá trình, chia nó x lần (tạo cho nó một đối tượng bộ nhớ chia sẻ) và sau đó gọi các quá trình đó từ một quá trình python khác. atm tôi sử dụng JSON như là một quá trình giao tiếp, nhưng điều này đang đến (vẫn còn rất thử nghiệm): http://pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental – Carst

+0

Nhân tiện, bạn có bao giờ nhìn vào HDF5 với chunking? (HDF5 không được lưu để viết đồng thời, nhưng bạn cũng có thể lưu vào các tệp riêng biệt và trong nội dung kết hợp) – Carst

Trả lời

12

Từ các nhận xét ở trên, có vẻ như đây là kế hoạch cho pandas một thời gian (cũng có một thú vị-tìm kiếm rosetta project mà tôi vừa nhận thấy).

Tuy nhiên, cho đến khi tất cả các chức năng song song được đưa vào pandas, tôi nhận thấy rằng nó rất dễ dàng để viết hiệu quả & không nhớ sao chép augmentations song song với pandas trực tiếp sử dụng cython + OpenMP và C++.

Dưới đây là một ví dụ ngắn viết một song song groupby-sum, mà sử dụng là một cái gì đó như thế này:

import pandas as pd 
import para_group_demo 

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)}) 
print para_group_demo.sum(df.a, df.b) 

và đầu ra là:

 sum 
key  
0  6 
1  11 
2  4 

Note Không nghi ngờ gì, đây chức năng của ví dụ đơn giản cuối cùng sẽ là một phần của pandas. Một số điều, tuy nhiên, sẽ tự nhiên hơn để song song trong C++ một thời gian, và điều quan trọng là phải nhận thức được cách dễ dàng để kết hợp điều này vào pandas.


Để làm điều này, tôi đã viết một phần mở rộng tệp đơn nguồn đơn giản có mã sau.

Nó bắt đầu với một số hàng nhập khẩu và định nghĩa kiểu

from libc.stdint cimport int64_t, uint64_t 
from libcpp.vector cimport vector 
from libcpp.unordered_map cimport unordered_map 

cimport cython 
from cython.operator cimport dereference as deref, preincrement as inc 
from cython.parallel import prange 

import pandas as pd 

ctypedef unordered_map[int64_t, uint64_t] counts_t 
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t 
ctypedef vector[counts_t] counts_vec_t 

Loại C++ unordered_map là để tổng hợp bởi một chủ đề duy nhất, và vector là để tổng hợp của tất cả các chủ đề.

Hiện tại với chức năng sum.Nó bắt đầu với typed memory views để truy cập nhanh:

def sum(crit, vals): 
    cdef int64_t[:] crit_view = crit.values 
    cdef int64_t[:] vals_view = vals.values 

Chức năng tiếp tục bằng cách chia bán như nhau đối với các chủ đề (ở đây hardcoded đến 4), và có mỗi thread tổng các mục trong phạm vi của nó:

cdef uint64_t num_threads = 4 
    cdef uint64_t l = len(crit) 
    cdef uint64_t s = l/num_threads + 1 
    cdef uint64_t i, j, e 
    cdef counts_vec_t counts 
    counts = counts_vec_t(num_threads) 
    counts.resize(num_threads) 
    with cython.boundscheck(False): 
     for i in prange(num_threads, nogil=True): 
      j = i * s 
      e = j + s 
      if e > l: 
       e = l 
      while j < e: 
       counts[i][crit_view[j]] += vals_view[j] 
       inc(j) 

Khi đề đã hoàn thành, chức năng kết hợp tất cả các kết quả (từ phạm vi khác nhau) thành một đơn unordered_map:

cdef counts_t total 
    cdef counts_it_t it, e_it 
    for i in range(num_threads): 
     it = counts[i].begin() 
     e_it = counts[i].end() 
     while it != e_it: 
      total[deref(it).first] += deref(it).second 
      inc(it)   

Tất cả các thứ ở bên trái là tạo một DataFrame và trả lại kết quả:

key, sum_ = [], [] 
    it = total.begin() 
    e_it = total.end() 
    while it != e_it: 
     key.append(deref(it).first) 
     sum_.append(deref(it).second) 
     inc(it) 

    df = pd.DataFrame({'key': key, 'sum': sum_}) 
    df.set_index('key', inplace=True) 
    return df 
Các vấn đề liên quan