2011-12-15 43 views
10

Tôi đang sử dụng multiprocessing.Pool()multiprocessing.pool.map và chức năng với hai đối số

đây là những gì tôi muốn bơi:

def insert_and_process(file_to_process,db): 
    db = DAL("path_to_mysql" + db) 
    #Table Definations 
    db.table.insert(**parse_file(file_to_process)) 
    return True 

if __name__=="__main__": 
    file_list=os.listdir(".") 
    P = Pool(processes=4) 
    P.map(insert_and_process,file_list,db) # here having problem. 

Tôi muốn vượt qua 2 đối số gì tôi muốn làm là để chỉ khởi tạo 4 kết nối DB (ở đây sẽ cố gắng tạo kết nối trên mọi cuộc gọi hàm để có thể có hàng triệu cuộc gọi và khiến IO bị đóng băng đến chết). nếu tôi có thể tạo ra 4 kết nối db và 1 cho mỗi quá trình nó sẽ là ok.

Có giải pháp nào cho Hồ bơi không? hay tôi nên từ bỏ nó?

EDIT:

Từ sự giúp đỡ của cả hai bạn tôi đã nhận điều này bằng cách thực hiện điều này:

args=zip(f,cycle(dbs)) 
Out[-]: 
[('f1', 'db1'), 
('f2', 'db2'), 
('f3', 'db3'), 
('f4', 'db4'), 
('f5', 'db1'), 
('f6', 'db2'), 
('f7', 'db3'), 
('f8', 'db4'), 
('f9', 'db1'), 
('f10', 'db2'), 
('f11', 'db3'), 
('f12', 'db4')] 

Vì vậy, ở đây nó như thế nào nó sẽ làm việc, Tôi gonna di chuyển mã kết nối DB ra đến mức chính và làm điều này:

def process_and_insert(args): 

    #Table Definations 
    args[1].table.insert(**parse_file(args[0])) 
    return True 

if __name__=="__main__": 
    file_list=os.listdir(".") 
    P = Pool(processes=4) 

    dbs = [DAL("path_to_mysql/database") for i in range(0,3)] 
    args=zip(file_list,cycle(dbs)) 
    P.map(insert_and_process,args) # here having problem. 

Vâng, tôi sẽ kiểm tra và cho các bạn biết.

Trả lời

26

các tài liệu Pool không nói ra một cách để đi qua nhiều hơn một tham số cho hàm mục tiêu - tôi đã cố gắng chỉ đi qua một chuỗi, nhưng không nhận được mở ra (một mục tương ứng của chuỗi cho mỗi tham số).

Tuy nhiên, bạn có thể viết hàm mục tiêu của bạn để mong đợi là người đầu tiên (và duy nhất) tham số là một tuple, trong đó mỗi phần tử là một trong những tham số mà bạn đang mong đợi:

from itertools import repeat 

def insert_and_process((file_to_process,db)): 
    db = DAL("path_to_mysql" + db) 
    #Table Definations 
    db.table.insert(**parse_file(file_to_process)) 
    return True 

if __name__=="__main__": 
    file_list=os.listdir(".") 
    P = Pool(processes=4) 
    P.map(insert_and_process,zip(file_list,repeat(db))) 

(lưu ý thêm dấu ngoặc đơn trong định nghĩa của insert_and_process - python coi đó là một thông số duy nhất phải là một chuỗi gồm 2 mục. Phần tử đầu tiên của chuỗi được quy cho biến đầu tiên và phần tử thứ hai là thứ tự)

+4

Lưu ý rằng cú pháp 'def f ((arg1, arg2)):' biến mất trong Python 3. –

+1

@FerdinandBeyer: Tôi đã quên điều đó. Vâng, trừ khi thực hiện multiprocessing.Pool.map là khác nhau ở đó, con đường để đi sẽ được giao cho một đối số duy nhất, và giải nén nó bên trong chức năng sau đó. – jsbueno

+0

Cảm ơn tôi đã làm việc đó! tôi đã nhận nó bằng cách làm zip (file_list, chu kỳ (dbs)). Nhưng tôi không sử dụng f ((arg1, arg2)). khi tôi sử dụng nhiều mã của bạn, tôi đã chọn bạn! –

8

Hồ bơi của bạn sẽ sinh ra bốn quy trình, mỗi tiến trình được thực hiện bởi phiên bản riêng của trình thông dịch Python. Bạn có thể sử dụng một biến toàn cục để giữ đối tượng kết nối cơ sở dữ liệu của bạn, do đó chính xác một kết nối được tạo ra cho mỗi quá trình:

global_db = None 

def insert_and_process(file_to_process, db): 
    global global_db 
    if global_db is None: 
     # If this is the first time this function is called within this 
     # process, create a new connection. Otherwise, the global variable 
     # already holds a connection established by a former call. 
     global_db = DAL("path_to_mysql" + db) 
    global_db.table.insert(**parse_file(file_to_process)) 
    return True 

Kể từ Pool.map() và bạn bè chỉ hỗ trợ chức năng lao động một đối số, bạn cần tạo một wrapper rằng tiền đạo công trình:

def insert_and_process_helper(args): 
    return insert_and_process(*args) 

if __name__ == "__main__": 
    file_list=os.listdir(".") 
    db = "wherever you get your db" 
    # Create argument tuples for each function call: 
    jobs = [(file, db) for file in file_list] 
    P = Pool(processes=4) 
    P.map(insert_and_process_helper, jobs) 
+0

Cảm ơn Ferdinand, điều này gần với những gì tôi muốn. Những gì tôi muốn làm là tạo ra 4 DB Connections. Một kết nối cho mỗi quy trình, nhưng không phải cho mọi cuộc gọi hàm. 'DAL (" Đường dẫn đến db ")' sẽ tạo một kết nối db. Kết nối đơn sẽ chậm hơn so với kết nối Quad cùng một lúc. –

+0

Tôi đã thử các ví dụ đó và nó hoạt động tốt khi chức năng không phải trả lại ...; Chúng ta không thể làm một cái gì đó như my_var = P.map (insert_and_process_helper, jobs)? – neverMind

+0

@neverMind tất nhiên bạn có thể –

5

Không cần sử dụng zip.Nếu ví dụ bạn có 2 thông số, x và y, và mỗi người trong số họ có thể nhận được nhiều giá trị, như:

X=range(1,6) 
Y=range(10) 

Các chức năng nên chỉ nhận một tham số, và giải nén nó bên trong:

def func(params): 
    (x,y)=params 
    ... 

và bạn gọi nó như thế:

params = [(x,y) for x in X for y in Y] 
pool.map(func, params) 
2

Sử dụng

params=[(x,y) for x in X for y in Y] 

bạn tạo một bản sao đầy đủ của xy, và điều đó có thể chậm hơn so với sử dụng

from itertools import repeat 
P.map(insert_and_process,zip(file_list,repeat(db))) 
1

Bạn có thể sử dụng thư viện

from functools import partial 

cho mục đích này

như

func = partial(rdc, lat, lng) 
r = pool.map(func, range(8)) 

def rdc(lat,lng,x): 
    pass 
Các vấn đề liên quan