Bạn có thể sử dụng một Queue
để nuôi lại thất bại vào Pool
qua một vòng lặp trong khởi Process
:
import multiprocessing as mp
import random
def f(x):
if random.getrandbits(1):
# on failure/exception catch
f.q.put(x)
return None
return x*x
def f_init(q):
f.q = q
def main(pending):
total_items = len(pending)
successful = []
failure_tracker = []
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, pending)
retry_results = []
while len(successful) < total_items:
successful.extend([r for r in results if not r is None])
successful.extend([r for r in retry_results if not r is None])
failed_items = []
while not q.empty():
failed_items.append(q.get())
if failed_items:
failure_tracker.append(failed_items)
retry_results = p.imap(f, failed_items);
p.close()
p.join()
print "Results: %s" % successful
print "Failures: %s" % failure_tracker
if __name__ == '__main__':
main(range(1, 10))
Đầu ra là như thế này:
Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]
Một không thể Pool
được chia sẻ giữa nhiều quy trình. Do đó phương pháp tiếp cận dựa trên phương pháp này Queue
. Nếu bạn cố gắng để vượt qua một hồ bơi như một tham số để các quá trình bơi, bạn sẽ nhận được lỗi này:
NotImplementedError: pool objects cannot be passed between processes or pickled
Hoặc bạn có thể thử một vài lần thử lại ngay lập tức trong vòng chức năng của bạn f
, để tránh đồng bộ trên cao. Nó thực sự là một vấn đề của chức năng của bạn sớm như thế nào nên chờ đợi để thử lại, và về khả năng một thành công là nếu thử lại ngay lập tức.
Cũ Trả lời:Vì lợi ích của sự hoàn chỉnh, đây là câu trả lời cũ của tôi, đó không phải là tối ưu như gửi lại trực tiếp vào hồ bơi, nhưng vẫn có thể có liên quan tùy thuộc vào trường hợp sử dụng , bởi vì nó cung cấp một cách tự nhiên để đối phó với/hạn n
lần thử lại -level:
Bạn có thể sử dụng một Queue
để thất bại tổng hợp và gửi lại vào cuối mỗi lần chạy, qua nhiều lần chạy:
import multiprocessing as mp
import random
def f(x):
if random.getrandbits(1):
# on failure/exception catch
f.q.put(x)
return None
return x*x
def f_init(q):
f.q = q
def main(pending):
run_number = 1
while pending:
jobs = pending
pending = []
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()
p.join()
failed_items = []
while not q.empty():
failed_items.append(q.get())
successful = [r for r in results if not r is None]
print "(%d) Succeeded: %s" % (run_number, successful)
print "(%d) Failed: %s" % (run_number, failed_items)
print
pending = failed_items
run_number += 1
if __name__ == '__main__':
main(range(1, 10))
với sản lượng như thế này:
(1) Succeeded: [9, 16, 36, 81]
(1) Failed: [2, 1, 5, 7, 8]
(2) Succeeded: [64]
(2) Failed: [2, 1, 5, 7]
(3) Succeeded: [1, 25]
(3) Failed: [2, 7]
(4) Succeeded: [49]
(4) Failed: [2]
(5) Succeeded: [4]
(5) Failed: []
Có lẽ bạn muốn 'trở lại f (x) 'thay vì tăng' ValueError'? Chỉ cần đoán ... –
Cơ hội thất bại trong ứng dụng thực tế của bạn cao bao nhiêu? Đó là, làm thế nào quan trọng là nó quá trình thử lại ngay lập tức như trái ngược với chờ đợi cho các quá trình khác để hoàn thành đầu tiên? – Isaac
Đó là một cơ hội vừa phải thất bại, và nó không cần phải được thử lại ngay lập tức (nhưng nên được thử lại song song, cuối cùng). – ash