Từ các nhận xét ở trên, có vẻ như đây là kế hoạch cho pandas
một thời gian (cũng có một thú vị-tìm kiếm rosetta
project mà tôi vừa nhận thấy).
Tuy nhiên, cho đến khi tất cả các chức năng song song được đưa vào pandas
, tôi nhận thấy rằng nó rất dễ dàng để viết hiệu quả & không nhớ sao chép augmentations song song với pandas
trực tiếp sử dụng cython
+ OpenMP và C++.
Dưới đây là một ví dụ ngắn viết một song song groupby-sum, mà sử dụng là một cái gì đó như thế này:
import pandas as pd
import para_group_demo
df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)
và đầu ra là:
sum
key
0 6
1 11
2 4
Note Không nghi ngờ gì, đây chức năng của ví dụ đơn giản cuối cùng sẽ là một phần của pandas
. Một số điều, tuy nhiên, sẽ tự nhiên hơn để song song trong C++ một thời gian, và điều quan trọng là phải nhận thức được cách dễ dàng để kết hợp điều này vào pandas
.
Để làm điều này, tôi đã viết một phần mở rộng tệp đơn nguồn đơn giản có mã sau.
Nó bắt đầu với một số hàng nhập khẩu và định nghĩa kiểu
from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange
import pandas as pd
ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t
Loại C++ unordered_map
là để tổng hợp bởi một chủ đề duy nhất, và vector
là để tổng hợp của tất cả các chủ đề.
Hiện tại với chức năng sum
.Nó bắt đầu với typed memory views để truy cập nhanh:
def sum(crit, vals):
cdef int64_t[:] crit_view = crit.values
cdef int64_t[:] vals_view = vals.values
Chức năng tiếp tục bằng cách chia bán như nhau đối với các chủ đề (ở đây hardcoded đến 4), và có mỗi thread tổng các mục trong phạm vi của nó:
cdef uint64_t num_threads = 4
cdef uint64_t l = len(crit)
cdef uint64_t s = l/num_threads + 1
cdef uint64_t i, j, e
cdef counts_vec_t counts
counts = counts_vec_t(num_threads)
counts.resize(num_threads)
with cython.boundscheck(False):
for i in prange(num_threads, nogil=True):
j = i * s
e = j + s
if e > l:
e = l
while j < e:
counts[i][crit_view[j]] += vals_view[j]
inc(j)
Khi đề đã hoàn thành, chức năng kết hợp tất cả các kết quả (từ phạm vi khác nhau) thành một đơn unordered_map
:
cdef counts_t total
cdef counts_it_t it, e_it
for i in range(num_threads):
it = counts[i].begin()
e_it = counts[i].end()
while it != e_it:
total[deref(it).first] += deref(it).second
inc(it)
Tất cả các thứ ở bên trái là tạo một DataFrame
và trả lại kết quả:
key, sum_ = [], []
it = total.begin()
e_it = total.end()
while it != e_it:
key.append(deref(it).first)
sum_.append(deref(it).second)
inc(it)
df = pd.DataFrame({'key': key, 'sum': sum_})
df.set_index('key', inplace=True)
return df
Theo như tôi biết, không có cách nào để chia sẻ các đối tượng tùy ý. Tôi tự hỏi, nếu việc tẩy rửa mất nhiều thời gian hơn nhiều so với việc đạt được thông qua xử lý đa. Có lẽ bạn nên tìm kiếm một khả năng để tạo ra các gói công việc lớn hơn cho mỗi quy trình để giảm thời gian tẩy tương đối. Một khả năng khác là sử dụng đa xử lý khi bạn tạo nhóm. –
Tôi làm một cái gì đó như thế nhưng sử dụng UWSGI, Flask và preforking: Tôi tải khung dữ liệu gấu trúc vào một quá trình, chia nó x lần (tạo cho nó một đối tượng bộ nhớ chia sẻ) và sau đó gọi các quá trình đó từ một quá trình python khác. atm tôi sử dụng JSON như là một quá trình giao tiếp, nhưng điều này đang đến (vẫn còn rất thử nghiệm): http://pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental – Carst
Nhân tiện, bạn có bao giờ nhìn vào HDF5 với chunking? (HDF5 không được lưu để viết đồng thời, nhưng bạn cũng có thể lưu vào các tệp riêng biệt và trong nội dung kết hợp) – Carst