2011-12-13 30 views
24

Tôi mới làm quen với đối tượng python và tôi đang viết lại ứng dụng hiện có của mình như là một phiên bản hướng đối tượng, vì bây giờ các nhà phát triển đang tăng lên và mã của tôi đang trở nên không thể bảo trì.python subclassing multiprocessing.Process

Thông thường tôi sử dụng hàng đợi đa nhưng tôi tìm thấy từ ví dụ này http://www.doughellmann.com/PyMOTW/multiprocessing/basics.html mà tôi có thể phân lớp multiprocessing.Process vì vậy tôi nghĩ rằng đó là một ý tưởng tốt và tôi đã viết một lớp để kiểm tra như thế này:

mã:

from multiprocessing import Process 
class Processor(Process): 
    def return_name(self): 
     return "Process %s" % self.name 
    def run(self): 
     return self.return_name() 

processes = [] 


if __name__ == "__main__": 

     for i in range(0,5): 
       p=Processor() 
       processes.append(p) 
       p.start() 
     for p in processes: 
       p.join() 

Tuy nhiên tôi không thể lấy lại các giá trị, làm thế nào tôi có thể sử dụng hàng đợi theo cách này?

CHỈNH SỬA: Tôi muốn nhận giá trị trả lại và suy nghĩ vị trí đặt Queues().

Trả lời

28

subclassing multiprocessing.Process:

Tuy nhiên tôi không thể lấy lại giá trị, làm thế nào tôi có thể sử dụng hàng đợi theo cách này?

Process cần một Queue() để nhận được kết quả ... Một ví dụ về làm thế nào để phân lớp multiprocessing.Process sau ...

from multiprocessing import Process, Queue 
class Processor(Process): 

    def __init__(self, queue, idx, **kwargs): 
     super(Processor, self).__init__() 
     self.queue = queue 
     self.idx = idx 
     self.kwargs = kwargs 

    def run(self): 
     """Build some CPU-intensive tasks to run via multiprocessing here.""" 
     hash(self.kwargs) # Shameless usage of CPU for no gain... 

     ## Return some information back through multiprocessing.Queue 
     ## NOTE: self.name is an attribute of multiprocessing.Process 
     self.queue.put("Process idx={0} is called '{1}'".format(self.idx, self.name)) 

if __name__ == "__main__": 
    NUMBER_OF_PROCESSES = 5 

    ## Create a list to hold running Processor object instances... 
    processes = list() 

    q = Queue() # Build a single queue to send to all process objects... 
    for i in range(0, NUMBER_OF_PROCESSES): 
     p=Processor(queue=q, idx=i) 
     p.start() 
     processes.append(p) 

    # Incorporating ideas from this answer, below... 
    # https://stackoverflow.com/a/42137966/667301 
    [proc.join() for proc in processes] 
    while not q.empty(): 
     print "RESULT: {0}".format(q.get()) # get results from the queue... 

Trên máy tính của tôi, điều này dẫn đến ...

$ python test.py 
RESULT: Process idx=0 is called 'Processor-1' 
RESULT: Process idx=4 is called 'Processor-5' 
RESULT: Process idx=3 is called 'Processor-4' 
RESULT: Process idx=1 is called 'Processor-2' 
RESULT: Process idx=2 is called 'Processor-3' 
$ 


Sử dụng multiprocessing.Pool:

FWIW, một trong những bất lợi mà tôi đã tìm thấy để phân lớp multiprocessing.Process là bạn không thể tận dụng tất cả sự tích hợp sẵn trong số multiprocessing.Pool; Pool cung cấp cho bạn API rất đẹp nếu bạn không cần mã sản xuất và mã người tiêu dùng của bạn để nói chuyện với nhau thông qua hàng đợi.

Bạn có thể làm được rất nhiều chỉ với một số giá trị trả về sáng tạo ... trong ví dụ sau, tôi sử dụng một dict() để đóng gói các giá trị đầu vào và đầu ra từ pool_job() ...

from multiprocessing import Pool 

def pool_job(input_val=0): 
    # FYI, multiprocessing.Pool can't guarantee that it keeps inputs ordered correctly 
    # dict format is {input: output}... 
    return {'pool_job(input_val={0})'.format(input_val): int(input_val)*12} 

pool = Pool(5) # Use 5 multiprocessing processes to handle jobs... 
results = pool.map(pool_job, xrange(0, 12)) # map xrange(0, 12) into pool_job() 
print results 

Điều này dẫn đến:

[ 
    {'pool_job(input_val=0)': 0}, 
    {'pool_job(input_val=1)': 12}, 
    {'pool_job(input_val=2)': 24}, 
    {'pool_job(input_val=3)': 36}, 
    {'pool_job(input_val=4)': 48}, 
    {'pool_job(input_val=5)': 60}, 
    {'pool_job(input_val=6)': 72}, 
    {'pool_job(input_val=7)': 84}, 
    {'pool_job(input_val=8)': 96}, 
    {'pool_job(input_val=9)': 108}, 
    {'pool_job(input_val=10)': 120}, 
    {'pool_job(input_val=11)': 132} 
] 

Rõ ràng có nhiều cải tiến khác sẽ được thực hiện trong pool_job(), chẳng hạn như xử lý lỗi, nhưng điều này minh họa các yếu tố cần thiết. FYI, this answer cung cấp một ví dụ khác về cách sử dụng multiprocessing.Pool.

+0

Vì vậy, trong một trong những phương thức phải chấp nhận đối tượng Queue là tham số đúng không? –

+0

Xong! tôi đã tạo một phương thức init chấp nhận các hàng đợi. điều này trong lượt mở rộng multiprocessing.Process để chấp nhận Hàng đợi trực tiếp :) –

+0

Cảm ơn sự sửa chữa. Mã này 'return self.queue.put (self.return_name())' trả về một hàng đợi? –

2

Giá trị trả lại là Process.run không được chuyển đến bất kỳ đâu. Bạn cần phải gửi chúng trở lại quy trình gốc, ví dụ: sử dụng số multiprocessing.Queue (docs here).

2

Cảm ơn rất nhiều người.

Bây giờ heres làm thế nào tôi đã nhận nó làm :)

Trong ví dụ này tôi sử dụng nhiều queus như tôi không muốn giao tiếp giữa mỗi ohter nhưng chỉ với quá trình cha mẹ.

from multiprocessing import Process,Queue 
class Processor(Process): 
    def __init__(self,queue): 
     Process.__init__(self) 
     self.que=queue 
    def get_name(self): 
     return "Process %s" % self.name 
    def run(self): 
     self.que.put(self.get_name()) 



if __name__ == "__main__": 

     processes = [] 
     for i in range(0,5): 
       p=Processor(Queue()) 
       processes.append(p) 
       p.start() 
     for p in processes: 
       p.join() 
       print p.que.get() 
+0

Vui lòng xem lại mã của tôi và cho tôi biết những gì tôi có thể cải thiện để có nhiều thực hành tốt hơn. –

+0

bạn nên sử dụng 'super()' ... xem bài đăng của tôi ... –

+0

THanks tôi sẽ kiểm tra. Nhưng tôi đã đọc supers là nguy hiểm đặc biệt là nhiều thừa kế? điều đó có đúng không? –

2

Mike's answer là tốt nhất, nhưng chỉ cho đầy đủ Tôi muốn đề cập đến mà tôi thích thu hoạch hàng đợi ra khỏi join bối cảnh nên các bit cuối cùng sẽ trông như thế này:

[proc.join() for proc in processes] # 1. join 

while not q.empty(): # 2. get the results 
    print "RESULT: %s" % q.get() 
Các vấn đề liên quan