subclassing multiprocessing.Process
:
Tuy nhiên tôi không thể lấy lại giá trị, làm thế nào tôi có thể sử dụng hàng đợi theo cách này?
Process cần một Queue()
để nhận được kết quả ... Một ví dụ về làm thế nào để phân lớp multiprocessing.Process
sau ...
from multiprocessing import Process, Queue
class Processor(Process):
def __init__(self, queue, idx, **kwargs):
super(Processor, self).__init__()
self.queue = queue
self.idx = idx
self.kwargs = kwargs
def run(self):
"""Build some CPU-intensive tasks to run via multiprocessing here."""
hash(self.kwargs) # Shameless usage of CPU for no gain...
## Return some information back through multiprocessing.Queue
## NOTE: self.name is an attribute of multiprocessing.Process
self.queue.put("Process idx={0} is called '{1}'".format(self.idx, self.name))
if __name__ == "__main__":
NUMBER_OF_PROCESSES = 5
## Create a list to hold running Processor object instances...
processes = list()
q = Queue() # Build a single queue to send to all process objects...
for i in range(0, NUMBER_OF_PROCESSES):
p=Processor(queue=q, idx=i)
p.start()
processes.append(p)
# Incorporating ideas from this answer, below...
# https://stackoverflow.com/a/42137966/667301
[proc.join() for proc in processes]
while not q.empty():
print "RESULT: {0}".format(q.get()) # get results from the queue...
Trên máy tính của tôi, điều này dẫn đến ...
$ python test.py
RESULT: Process idx=0 is called 'Processor-1'
RESULT: Process idx=4 is called 'Processor-5'
RESULT: Process idx=3 is called 'Processor-4'
RESULT: Process idx=1 is called 'Processor-2'
RESULT: Process idx=2 is called 'Processor-3'
$
Sử dụng multiprocessing.Pool
:
FWIW, một trong những bất lợi mà tôi đã tìm thấy để phân lớp multiprocessing.Process
là bạn không thể tận dụng tất cả sự tích hợp sẵn trong số multiprocessing.Pool
; Pool
cung cấp cho bạn API rất đẹp nếu bạn không cần mã sản xuất và mã người tiêu dùng của bạn để nói chuyện với nhau thông qua hàng đợi.
Bạn có thể làm được rất nhiều chỉ với một số giá trị trả về sáng tạo ... trong ví dụ sau, tôi sử dụng một dict()
để đóng gói các giá trị đầu vào và đầu ra từ pool_job()
...
from multiprocessing import Pool
def pool_job(input_val=0):
# FYI, multiprocessing.Pool can't guarantee that it keeps inputs ordered correctly
# dict format is {input: output}...
return {'pool_job(input_val={0})'.format(input_val): int(input_val)*12}
pool = Pool(5) # Use 5 multiprocessing processes to handle jobs...
results = pool.map(pool_job, xrange(0, 12)) # map xrange(0, 12) into pool_job()
print results
Điều này dẫn đến:
[
{'pool_job(input_val=0)': 0},
{'pool_job(input_val=1)': 12},
{'pool_job(input_val=2)': 24},
{'pool_job(input_val=3)': 36},
{'pool_job(input_val=4)': 48},
{'pool_job(input_val=5)': 60},
{'pool_job(input_val=6)': 72},
{'pool_job(input_val=7)': 84},
{'pool_job(input_val=8)': 96},
{'pool_job(input_val=9)': 108},
{'pool_job(input_val=10)': 120},
{'pool_job(input_val=11)': 132}
]
Rõ ràng có nhiều cải tiến khác sẽ được thực hiện trong pool_job()
, chẳng hạn như xử lý lỗi, nhưng điều này minh họa các yếu tố cần thiết. FYI, this answer cung cấp một ví dụ khác về cách sử dụng multiprocessing.Pool
.
Vì vậy, trong một trong những phương thức phải chấp nhận đối tượng Queue là tham số đúng không? –
Xong! tôi đã tạo một phương thức init chấp nhận các hàng đợi. điều này trong lượt mở rộng multiprocessing.Process để chấp nhận Hàng đợi trực tiếp :) –
Cảm ơn sự sửa chữa. Mã này 'return self.queue.put (self.return_name())' trả về một hàng đợi? –