2014-12-05 24 views
27

Tôi đang cố viết một ứng dụng đồng thời áp dụng một chức năng với multiprocessing.Pool. Tôi muốn hàm này là một phương thức cá thể (vì vậy tôi có thể định nghĩa nó một cách khác nhau trong các lớp con khác nhau). Điều này dường như không thể; như tôi đã học ở nơi khác, rõ ràng là bound methods can't be pickled. Vì vậy, tại sao bắt đầu một multiprocessing.Process với một phương pháp ràng buộc như một công việc mục tiêu? Các mã sau đây:Tại sao tôi có thể vượt qua một phương pháp thể hiện để đa xử lý. Xử lý, nhưng không phải là đa xử lý.

import multiprocessing 

def test1(): 
    print "Hello, world 1" 

def increment(x): 
    return x + 1 

class testClass(): 
    def process(self): 
     process1 = multiprocessing.Process(target=test1) 
     process1.start() 
     process1.join() 
     process2 = multiprocessing.Process(target=self.test2) 
     process2.start() 
     process2.join() 

    def pool(self): 
     pool = multiprocessing.Pool(1) 
     for answer in pool.imap(increment, range(10)): 
      print answer 
     print 
     for answer in pool.imap(self.square, range(10)): 
      print answer 

    def test2(self): 
     print "Hello, world 2" 

    def square(self, x): 
     return x * x 

def main(): 
    c = testClass() 
    c.process() 
    c.pool() 

if __name__ == "__main__": 
    main() 

Tạo đầu ra này:

Hello, world 1 
Hello, world 2 
1 
2 
3 
4 
5 
6 
7 
8 
9 
10 

Exception in thread Thread-2: 
Traceback (most recent call last): 
    File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner 
    self.run() 
    File "C:\Python27\Lib\threading.py", line 504, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks 
    put(task) 
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed 

Tại sao Processes có thể xử lý ràng buộc các phương pháp, nhưng không bơi?

+0

Đó là vì chúng không thể được tuần tự hóa bởi 'pickle'. Nếu bạn cần phải ở trong python2.7, và bạn cần phải làm cho mã của bạn làm việc như là ... bạn nên sử dụng một ngã ba 'multiprocessing' có thể chọn phương pháp dụ và có thể chọn một' Pool'. Nhìn vào 'pathos.multiprocessing', mà bạn có thể tìm thấy trong liên kết stackoverflow bạn trích dẫn trong bài viết ở trên. –

+0

Cụ thể hơn, liên kết này cho thấy cách các phương thức thể hiện trong 2.x có thể được tuần tự hóa theo thứ tự trong một 'Pool': http://stackoverflow.com/a/21345273/2379433 –

+0

Liệu nó có phải là phương thức cá thể không? Bạn có thể sử dụng classmethod? Tôi đã thử nó và làm việc tốt cho tôi. –

Trả lời

20

phương pháp dụ Module pickle thường không thể dưa:

>>> import pickle 
>>> class A(object): 
... def z(self): print "hi" 
... 
>>> a = A() 
>>> pickle.dumps(a.z) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps 
    Pickler(file, protocol).dump(obj) 
    File "/usr/local/lib/python2.7/pickle.py", line 224, in dump 
    self.save(obj) 
    File "/usr/local/lib/python2.7/pickle.py", line 306, in save 
    rv = reduce(self.proto) 
    File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex 
    raise TypeError, "can't pickle %s objects" % base.__name__ 
TypeError: can't pickle instancemethod objects 

Tuy nhiên, các mô-đun multiprocessinghas a custom Pickler that adds some code to enable this feature:

# 
# Try making some callable types picklable 
# 

from pickle import Pickler 
class ForkingPickler(Pickler): 
    dispatch = Pickler.dispatch.copy() 

    @classmethod 
    def register(cls, type, reduce): 
     def dispatcher(self, obj): 
      rv = reduce(obj) 
      self.save_reduce(obj=obj, *rv) 
     cls.dispatch[type] = dispatcher 

def _reduce_method(m): 
    if m.im_self is None: 
     return getattr, (m.im_class, m.im_func.func_name) 
    else: 
     return getattr, (m.im_self, m.im_func.func_name) 
ForkingPickler.register(type(ForkingPickler.save), _reduce_method) 

Bạn có thể tái tạo này bằng cách sử dụng mô-đun copy_reg để xem nó hoạt động cho chính mình :

>>> import copy_reg 
>>> def _reduce_method(m): 
...  if m.im_self is None: 
...   return getattr, (m.im_class, m.im_func.func_name) 
...  else: 
...   return getattr, (m.im_self, m.im_func.func_name) 
... 
>>> copy_reg.pickle(type(a.z), _reduce_method) 
>>> pickle.dumps(a.z) 
"c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n." 

Khi bạn sử dụng Process.start để đẻ trứng một quá trình mới trên Windows, it pickles all the parameters you passed to the child process using this custom ForkingPickler:

# 
# Windows 
# 

else: 
    # snip... 
    from pickle import load, HIGHEST_PROTOCOL 

    def dump(obj, file, protocol=None): 
     ForkingPickler(file, protocol).dump(obj) 

    # 
    # We define a Popen class similar to the one from subprocess, but 
    # whose constructor takes a process object as its argument. 
    # 

    class Popen(object): 
     ''' 
     Start a subprocess to run the code of a process object 
     ''' 
     _tls = thread._local() 

     def __init__(self, process_obj): 
      # create pipe for communication with child 
      rfd, wfd = os.pipe() 

      # get handle for read end of the pipe and make it inheritable 
      ... 
      # start process 
      ... 

      # set attributes of self 
      ... 

      # send information to child 
      prep_data = get_preparation_data(process_obj._name) 
      to_child = os.fdopen(wfd, 'wb') 
      Popen._tls.process_handle = int(hp) 
      try: 
       dump(prep_data, to_child, HIGHEST_PROTOCOL) 
       dump(process_obj, to_child, HIGHEST_PROTOCOL) 
      finally: 
       del Popen._tls.process_handle 
       to_child.close() 

Note "gửi thông tin cho trẻ em" phần. Nó sử dụng hàm dump, sử dụng ForkingPickler để chọn dữ liệu, có nghĩa là phương pháp thể hiện của bạn có thể được chọn.

Bây giờ, khi bạn sử dụng các phương thức trên multiprocessing.Pool để gửi phương thức cho quy trình con, nó sử dụng multiprocessing.Pipe để chọn dữ liệu. Trong Python 2.7, multiprocessing.Pipe được triển khai trong C, and calls pickle_dumps directly, do đó, nó không tận dụng lợi thế của ForkingPickler. Điều đó có nghĩa là tẩy phương pháp thể hiện không hoạt động.

Tuy nhiên, nếu bạn sử dụng copy_reg đăng ký kiểu instancemethod, chứ không phải là một phong tục Pickler, tất cả nỗ lực tẩy sẽ bị ảnh hưởng. Vì vậy, bạn có thể sử dụng để cho phép tẩy phương pháp dụ, thậm chí qua Pool:

import multiprocessing 
import copy_reg 
import types 

def _reduce_method(m): 
    if m.im_self is None: 
     return getattr, (m.im_class, m.im_func.func_name) 
    else: 
     return getattr, (m.im_self, m.im_func.func_name) 
copy_reg.pickle(types.MethodType, _reduce_method) 

def test1(): 
    print("Hello, world 1") 

def increment(x): 
    return x + 1 

class testClass(): 
    def process(self): 
     process1 = multiprocessing.Process(target=test1) 
     process1.start() 
     process1.join() 
     process2 = multiprocessing.Process(target=self.test2) 
     process2.start() 
     process2.join() 

    def pool(self): 
     pool = multiprocessing.Pool(1) 
     for answer in pool.imap(increment, range(10)): 
      print(answer) 
     print 
     for answer in pool.imap(self.square, range(10)): 
      print(answer) 

    def test2(self): 
     print("Hello, world 2") 

    def square(self, x): 
     return x * x 

def main(): 
    c = testClass() 
    c.process() 
    c.pool() 

if __name__ == "__main__": 
    main() 

Output:

Hello, world 1 
Hello, world 2 
GOT (0, 0, (True, 1)) 
GOT (0, 1, (True, 2)) 
GOT (0, 2, (True, 3)) 
GOT (0, 3, (True, 4)) 
GOT (0, 4, (True, 5)) 
1GOT (0, 5, (True, 6)) 

GOT (0, 6, (True, 7)) 
2 
GOT (0, 7, (True, 8)) 
3 
GOT (0, 8, (True, 9)) 
GOT (0, 9, (True, 10)) 
4 
5 
6 
7 
8 
9 
10 

GOT (1, 0, (True, 0)) 
0 
GOT (1, 1, (True, 1)) 
1 
GOT (1, 2, (True, 4)) 
4 
GOT (1, 3, (True, 9)) 
9 
GOT (1, 4, (True, 16)) 
16 
GOT (1, 5, (True, 25)) 
25 
GOT (1, 6, (True, 36)) 
36 
GOT (1, 7, (True, 49)) 
49 
GOT (1, 8, (True, 64)) 
64 
GOT (1, 9, (True, 81)) 
81 
GOT None 

Cũng lưu ý rằng trong Python 3.x, pickle thể dưa các loại phương pháp dụ natively, vì vậy không công cụ này quan trọng hơn nữa. :)

+0

Cảm ơn bạn đã đề xuất; Tôi ngạc nhiên khi mô-đun đa xử lý không thực hiện điều này. Giải pháp chính xác của bạn không hiệu quả với tôi bởi vì nó liên quan đến việc tẩy trắng thể hiện mà phương thức được ràng buộc, gây ra các vấn đề khác, nhưng nó chỉ cho tôi đi đúng hướng. Thay vào đó, tôi định nghĩa các phương thức để chạy trong quá trình đa xử lý ở cấp cao nhất của một mô-đun để phá vỡ cả hai vấn đề và nhận được hành vi mà tôi muốn. – dpitch40

7

Đây là giải pháp thay thế mà đôi khi tôi sử dụng và nó hoạt động trong Python2.x:

Bạn có thể tạo một top-level "bí danh" của các loại phương pháp dụ, mà chấp nhận một đối tượng có phương pháp dụ bạn muốn chạy trong một hồ bơi, và có nó gọi phương pháp dụ cho bạn:

import functools 
import multiprocessing 

def _instance_method_alias(obj, arg): 
    """ 
    Alias for instance method that allows the method to be called in a 
    multiprocessing pool 
    """ 
    obj.instance_method(arg) 
    return 

class MyClass(object): 
    """ 
    Our custom class whose instance methods we want to be able to use in a 
    multiprocessing pool 
    """ 

    def __init__(self): 
     self.my_string = "From MyClass: {}" 

    def instance_method(self, arg): 
     """ 
     Some arbitrary instance method 
     """ 

     print(self.my_string.format(arg)) 
     return 

# create an object of MyClass 
obj = MyClass() 

# use functools.partial to create a new method that always has the 
# MyClass object passed as its first argument 
_bound_instance_method_alias = functools.partial(_instance_method_alias, obj) 

# create our list of things we will use the pool to map 
l = [1,2,3] 

# create the pool of workers 
pool = multiprocessing.Pool() 

# call pool.map, passing it the newly created function 
pool.map(_bound_instance_method_alias, l) 

# cleanup 
pool.close() 
pool.join() 

mã này tạo ra sản lượng này:

Từ MyClass: 1
Từ MyClass: 2
Từ MyClass: 3

01.

Một hạn chế là bạn không thể sử dụng điều này cho các phương pháp sửa đổi đối tượng. Mỗi quá trình sẽ nhận được một bản sao của đối tượng mà nó đang gọi các phương thức trên, vì vậy các thay đổi sẽ không được truyền lại cho quá trình chính. Nếu bạn không cần sửa đổi đối tượng từ các phương thức mà bạn đang gọi, thì đây có thể là một giải pháp đơn giản.

+1

Cảm ơn bạn đã đăng bài, có ý nghĩa với tôi sau khi lặn vào dưa muối và điều này làm việc cho tôi. Python 3 (cuối cùng) sẽ thu hẹp khoảng cách này. Chúc mừng! – leomelzer

4

Đây là cách làm việc dễ dàng hơn trong Python 2, chỉ cần bọc phương thức thể hiện gốc. Hoạt động tốt trên MacOSX và Linux, không hoạt động trên Windows, được thử nghiệm Python 2.7

from multiprocessing import Pool 

class Person(object): 
    def __init__(self): 
     self.name = 'Weizhong Tu' 

    def calc(self, x): 
     print self.name 
     return x ** 5 


def func(x, p=Person()): 
    return p.calc(x) 


pool = Pool() 
print pool.map(func, range(10))