2012-01-27 29 views
26

Tôi đang cố gắng sử dụng một hồ bơi công nhân trong python bằng cách sử dụng các đối tượng Process. Mỗi nhân viên (một Process) thực hiện một số khởi tạo (mất một khoảng thời gian không nhỏ), được thông qua một loạt các công việc (lý tưởng là sử dụng map()), và trả về một cái gì đó. Không có thông tin liên lạc là cần thiết ngoài đó. Tuy nhiên, tôi không thể tìm ra cách sử dụng map() để sử dụng chức năng compute() của công nhân của tôi.python Pool with worker Quy trình

from multiprocessing import Pool, Process 

class Worker(Process): 
    def __init__(self): 
     print 'Worker started' 
     # do some initialization here 
     super(Worker, self).__init__() 

    def compute(self, data): 
     print 'Computing things!' 
     return data * data 

if __name__ == '__main__': 
    # This works fine 
    worker = Worker() 
    print worker.compute(3) 

    # workers get initialized fine 
    pool = Pool(processes = 4, 
       initializer = Worker) 
    data = range(10) 
    # How to use my worker pool? 
    result = pool.map(compute, data) 

Công việc có xếp hàng thay thế hay tôi có thể sử dụng map()?

+0

Tất cả các đối tượng quá trình đều có trạng thái. Bạn có thể muốn xóa từ đó khỏi tiêu đề. Cũng thế. 'compute' là một phương thức của một Worker. Trong các ví dụ, nó thường là một chức năng hoàn toàn độc lập. Tại sao không viết chức năng tính toán để đơn giản bao gồm cả khởi tạo và xử lý? –

+0

Đủ công bằng, cảm ơn. Việc khởi tạo mất một thời gian dài, vì vậy tôi chỉ muốn làm điều đó một lần cho mỗi quy trình công nhân. – Felix

+0

Bạn phải nhấn mạnh "được truyền một loạt công việc" một phần của câu hỏi. Vì điều đó không rõ ràng. –

Trả lời

50

Tôi khuyên bạn nên sử dụng Hàng đợi cho việc này.

class Worker(Process): 
    def __init__(self, queue): 
     super(Worker, self).__init__() 
     self.queue= queue 

    def run(self): 
     print 'Worker started' 
     # do some initialization here 

     print 'Computing things!' 
     for data in iter(self.queue.get, None): 
      # Use data 

Bây giờ bạn có thể bắt đầu một đống này, tất cả nhận công việc từ một hàng đợi đơn

request_queue = Queue() 
for i in range(4): 
    Worker(request_queue).start() 
for data in the_real_source: 
    request_queue.put(data) 
# Sentinel objects to allow clean shutdown: 1 per worker. 
for i in range(4): 
    request_queue.put(None) 

Đó là loại điều nên cho phép bạn để trừ dần nguyên giá khởi động đắt tiền trên nhiều lao động.

+0

Đó là những gì tôi đã tìm, cảm ơn! Tôi đã kết thúc bằng cách sử dụng một hàng đợi công việc (đầu vào) và hàng đợi kết quả (đầu ra) để đồng bộ hóa tất cả mọi thứ. – Felix

+0

bạn ví dụ là tuyệt vời, tôi cố gắng ngay bây giờ làm thế nào để nhập các đối tượng sentinel khi strg + c được nhấn mà không có một exepction – Dukeatcoding

+0

@ S.Lott: Nó không phải là hàng đợi không phải là pickle-thể? và đó là lý do tại sao bạn sử dụng [multiprocessing.Manager() .Queue] (http://stackoverflow.com/questions/3217002/how-do-you-pass-a-queue-reference-to-a-function-managed-by -pool-map-async)? – zuuz

4

initializer mong đợi một cuộc gọi có thể thực hiện tùy ý, chẳng hạn như, nó có thể đặt một số hình cầu, không phải là lớp con Process; map chấp nhận một tùy ý có thể lặp lại:

#!/usr/bin/env python 
import multiprocessing as mp 

def init(val): 
    print('do some initialization here') 

def compute(data): 
    print('Computing things!') 
    return data * data 

def produce_data(): 
    yield -100 
    for i in range(10): 
     yield i 
    yield 100 

if __name__=="__main__": 
    p = mp.Pool(initializer=init, initargs=('arg',)) 
    print(p.map(compute, produce_data()))