2014-08-28 21 views
29

Tôi cần một số cách để sử dụng hàm trong pool.map() chấp nhận nhiều tham số. Theo hiểu biết của tôi, hàm mục tiêu của pool.map() chỉ có thể có một tham số có thể lặp lại như một tham số nhưng có cách nào để tôi có thể chuyển các tham số khác không? Trong trường hợp này, tôi cần chuyển một vài biến cấu hình, như Lock() của tôi và ghi thông tin vào hàm đích.Chuyển nhiều tham số đến hàm pool.map() trong Python

Tôi đã cố gắng thực hiện một số nghiên cứu và tôi nghĩ rằng tôi có thể sử dụng một phần chức năng để làm cho nó hoạt động? Tuy nhiên, tôi không hoàn toàn hiểu được những công việc này như thế nào. Mọi sự trợ giúp sẽ rất được trân trọng! Dưới đây là một ví dụ đơn giản về những gì tôi muốn làm:

def target(items, lock): 
    for item in items: 
     # Do cool stuff 
     if (... some condition here ...): 
      lock.acquire() 
      # Write to stdout or logfile, etc. 
      lock.release() 

def main(): 
    iterable = [1, 2, 3, 4, 5] 
    pool = multiprocessing.Pool() 
    pool.map(target(PASS PARAMS HERE), iterable) 
    pool.close() 
    pool.join() 
+0

thảo luận ở đây: http://stackoverflow.com/question/5442910/python-multiprocessing-pool-map-for-multiple-arguments (Tôi đã sử dụng phương pháp "sao" của JF Sebastien thành công) – Roberto

+1

Xin vui lòng, bất cứ khi nào bạn sử dụng đa xử lý sử dụng mệnh đề try/finally, với close() và join() bên trong cuối cùng để đảm bảo các quá trình được đóng lại nếu một lỗi xảy ra. http://stackoverflow.com/questions/30506489/python-multiprocessing-leading-to-many-zombie-processes – zeehio

Trả lời

49

Bạn có thể sử dụng functools.partial cho điều này (như bạn nghi ngờ):

from functools import partial 

def target(lock, iterable_item): 
    for item in iterable_item: 
     # Do cool stuff 
     if (... some condition here ...): 
      lock.acquire() 
      # Write to stdout or logfile, etc. 
      lock.release() 

def main(): 
    iterable = [1, 2, 3, 4, 5] 
    pool = multiprocessing.Pool() 
    l = multiprocessing.Lock() 
    func = partial(target, l) 
    pool.map(func, iterable) 
    pool.close() 
    pool.join() 

Ví dụ:

def f(a, b, c): 
    print("{} {} {}".format(a, b, c)) 

def main(): 
    iterable = [1, 2, 3, 4, 5] 
    pool = multiprocessing.Pool() 
    a = "hi" 
    b = "there" 
    func = partial(f, a, b) 
    pool.map(func, iterable) 
    pool.close() 
    pool.join() 

if __name__ == "__main__": 
    main() 

Output:

hi there 1 
hi there 2 
hi there 3 
hi there 4 
hi there 5 
+0

Tuyệt vời, tôi nghĩ tất cả những gì tôi cần là một ví dụ rõ ràng như thế này. Cảm ơn một tấn! – DJMcCarthy12

+0

Ví dụ tuyệt vời. Tuy nhiên, một câu hỏi đặt ra: tại sao có một 'mục trong các mục:' trong định nghĩa của 'mục tiêu'? –

+0

@ Jean-FrancoisT. Sao chép/dán lỗi! Cảm ơn đã chỉ ra điều đó. – dano

0

Trong trường hợp bạn không có quyền truy cập vào functools.partial, bạn cũng có thể sử dụng chức năng trình bao bọc cho điều này.

def target(lock): 
    def wrapped_func(items): 
     for item in items: 
      # Do cool stuff 
      if (... some condition here ...): 
       lock.acquire() 
       # Write to stdout or logfile, etc. 
       lock.release() 
    return wrapped_func 

def main(): 
    iterable = [1, 2, 3, 4, 5] 
    pool = multiprocessing.Pool() 
    lck = multiprocessing.Lock() 
    pool.map(target(lck), iterable) 
    pool.close() 
    pool.join() 

Điều này làm cho target() thành một chức năng chấp nhận một khóa (hoặc bất kỳ thông số bạn muốn từ bỏ), và nó sẽ trở lại một chức năng mà chỉ mất trong một iterable như đầu vào, nhưng vẫn có thể sử dụng tất cả các thông số khác của bạn . Đó là những gì cuối cùng được thông qua vào pool.map(), mà sau đó nên thực hiện mà không có vấn đề gì.

+1

Tôi đang siêu muộn về điều này, nhưng mã này sẽ không hoạt động, bởi vì các chức năng lồng nhau không thể được ngâm. Việc gọi 'target (lck)' trả về hàm 'wrap_func' lồng nhau, mà cần phải được pickled để được truyền cho tiến trình công nhân, và điều đó sẽ luôn thất bại. – dano

3

Bạn có thể sử dụng chức năng bản đồ cho phép nhiều đối số, cũng như ngã ba của multiprocessing được tìm thấy trong pathos.

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> 
>>> def add_and_subtract(x,y): 
... return x+y, x-y 
... 
>>> res = Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1)) 
>>> res 
[(-5, 5), (-2, 6), (1, 7), (4, 8), (7, 9), (10, 10), (13, 11), (16, 12), (19, 13), (22, 14)] 
>>> Pool().map(add_and_subtract, *zip(*res)) 
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)] 

pathos phép bạn dễ dàng tổ bản đồ song song thứ bậc với nhiều đầu vào, vì vậy chúng tôi có thể mở rộng ví dụ để chứng minh rằng.

>>> from pathos.multiprocessing import ThreadingPool as TPool 
>>> 
>>> res = TPool().amap(add_and_subtract, *zip(*Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1)))) 
>>> res.get() 
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)] 

Thậm chí thú vị hơn, là xây dựng hàm lồng nhau mà chúng tôi có thể chuyển vào hồ bơi. Điều này là có thể vì pathos sử dụng dill, có thể tuần tự hóa hầu hết mọi thứ trong python.

>>> def build_fun_things(f, g): 
... def do_fun_things(x, y): 
...  return f(x,y), g(x,y) 
... return do_fun_things 
... 
>>> def add(x,y): 
... return x+y 
... 
>>> def sub(x,y): 
... return x-y 
... 
>>> neato = build_fun_things(add, sub) 
>>> 
>>> res = TPool().imap(neato, *zip(*Pool().map(neato, range(0,20,2), range(-5,5,1)))) 
>>> list(res) 
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)] 

Nếu bạn không thể đi ra ngoài thư viện chuẩn, tuy nhiên, bạn sẽ phải thực hiện theo cách khác. đặt cược tốt nhất của bạn trong trường hợp đó là sử dụng multiprocessing.starmap như đã thấy ở đây: Python multiprocessing pool.map for multiple arguments (ghi nhận của @Roberto trong các ý kiến ​​về bài đăng của OP)

Nhận pathos đây: https://github.com/uqfoundation

Các vấn đề liên quan