2015-07-28 23 views
8

Trước câu hỏi của tôi:tính toán song song trong Julia với dữ liệu lớn

  • là nó có thể để ngăn chặn Julia sao chép các biến mỗi lần trong một song song cho vòng lặp?
  • nếu không, làm thế nào để thực hiện một hoạt động giảm song song ở Julia?

Bây giờ các chi tiết:

tôi có chương trình này:

data = DataFrames.readtable("...") # a big baby (~100MB) 
filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame 
filtered_data = @parallel vcat for fct in filter_functions 
    fct(data)::DataFrame 
end 

Nó hoạt động chức năng đẹp khôn ngoan, nhưng mỗi cuộc gọi song song với FCT (dữ liệu) trên một bản sao công nhân toàn bộ khung dữ liệu, làm mọi thứ đau đớn chậm.

Lý tưởng nhất, tôi muốn tải dữ liệu một lần và luôn sử dụng từng dữ liệu trên mỗi công nhân dữ liệu được tải sẵn. tôi đã đưa ra đoạn mã này để làm như vậy:

@everywhere data = DataFrames.readtable("...") # a big baby (~100MB) 
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame 
@everywhere for i in 1:length(filter_functions) 
    if (myid()-1) % nworkers() 
    fct = filter_functions[i] 
    filtered_data_temp = fct(data) 
    end 
    # How to vcat all the filtered_data_temp ? 
end 

Nhưng bây giờ tôi có một vấn đề khác: Tôi không thể tìm ra cách để VCAT() tất cả các filtered_data_temp vào một biến trong người lao động với myid() == 1 .

Tôi rất cảm kích mọi thông tin chi tiết.

Lưu ý: Tôi biết về Operating in parallel on a large constant datastructure in Julia. Tuy nhiên, tôi không tin rằng nó áp dụng cho vấn đề của tôi bởi vì tất cả các filter_functions của tôi hoạt động trên mảng như một toàn thể.

Trả lời

4

Sau khi tất cả, Tôi tìm thấy ở đó giải pháp cho câu hỏi của tôi: Julia: How to copy data to another processor in Julia.

Đặc biệt, nó giới thiệu nguyên thủy sau đây để lấy một biến từ quá trình khác:

getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm))) 

Dưới đây là cách tôi đang sử dụng nó:

@everywhere data = DataFrames.readtable("...") # a big baby (~100MB) 
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame 
# Executes the filter functions 
@everywhere for i in 1:length(filter_functions) 
    local_results = ... # some type 
    if (myid()-1) % nworkers() 
    fct = filter_functions[i] 
    filtered_data_temp = fct(data) 
    local_results = vcat(local_results, filtered_data_temp) 
    end 
    # How to vcat all the filtered_data_temp ? 
end 
# Concatenate all the local results 
all_results = ... # some type 
for wid in 1:workers() 
    worker_local_results = getfrom(wid, :local_results) 
    all_results = vcat(all_results,worker_local_results) 
end 
+0

Sau đó, có vẻ như tôi cũng có thể sử dụng pmap(). –

10

Bạn có thể muốn nhìn vào/tải dữ liệu của bạn vào Distributed Arrays

EDIT: Có lẽ một cái gì đó như thế này:

data = DataFrames.readtable("...") 
dfiltered_data = distribute(data) #distributes data among processes automagically 
filter_functions = [ fct1, fct2, fct3 ... ] 
for fct in filter_functions 
    dfiltered_data = fct(dfiltered_data)::DataFrame 
end 

Bạn cũng có thể kiểm tra unit tests để biết thêm ví dụ

+0

Nó có vẻ tốt đẹp. Để tôi kiểm tra. –

+2

Bạn cũng có thể muốn xem xét 'SharedArray's, nếu tất cả dữ liệu của bạn bắt đầu trên một quy trình và bạn không muốn trả giá để chuyển chúng sang một quy trình khác. – tholy

+0

Đẹp, nhưng (1) Tôi nghĩ rằng nó sử dụng bộ nhớ chia sẻ, có thể không hoạt động trên các cụm phân tán (2) "phân phối" dường như không hỗ trợ dataframe (chỉ mảng). Kể từ đó, ví dụ của bạn cũng như các liên kết đến các bài kiểm tra đơn vị đã rất khai sáng. –

Các vấn đề liên quan