2012-07-18 40 views
6

Có cách nào để gửi lại một đoạn dữ liệu để xử lý, nếu tính toán ban đầu không thành công, sử dụng một nhóm đơn giản?retthon đa xử lý hồ bơi python

import random 
from multiprocessing import Pool 

def f(x): 
    if random.getrandbits(1): 
     raise ValueError("Retry this computation") 
    return x*x 

p = Pool(5) 
# If one of these f(x) calls fails, retry it with another (or same) process 
p.map(f, [1,2,3]) 
+1

Có lẽ bạn muốn 'trở lại f (x) 'thay vì tăng' ValueError'? Chỉ cần đoán ... –

+0

Cơ hội thất bại trong ứng dụng thực tế của bạn cao bao nhiêu? Đó là, làm thế nào quan trọng là nó quá trình thử lại ngay lập tức như trái ngược với chờ đợi cho các quá trình khác để hoàn thành đầu tiên? – Isaac

+0

Đó là một cơ hội vừa phải thất bại, và nó không cần phải được thử lại ngay lập tức (nhưng nên được thử lại song song, cuối cùng). – ash

Trả lời

9

Nếu bạn có thể (hoặc không nhớ) đang thử lại ngay lập tức, sử dụng một trang trí gói các chức năng:

import random 
from multiprocessing import Pool 
from functools import wraps 

def retry(f): 
    @wraps(f) 
    def wrapped(*args, **kwargs): 
     while True: 
      try: 
       return f(*args, **kwargs) 
      except ValueError: 
       pass 
    return wrapped 

@retry 
def f(x): 
    if random.getrandbits(1): 
     raise ValueError("Retry this computation") 
    return x*x 

p = Pool(5) 
# If one of these f(x) calls fails, retry it with another (or same) process 
p.map(f, [1,2,3]) 
5

Bạn có thể sử dụng một Queue để nuôi lại thất bại vào Pool qua một vòng lặp trong khởi Process:

import multiprocessing as mp 
import random 

def f(x): 
    if random.getrandbits(1): 
     # on failure/exception catch 
     f.q.put(x) 
     return None 
    return x*x 

def f_init(q): 
    f.q = q 

def main(pending): 
    total_items = len(pending) 
    successful = [] 
    failure_tracker = [] 

    q = mp.Queue() 
    p = mp.Pool(None, f_init, [q]) 
    results = p.imap(f, pending) 
    retry_results = [] 
    while len(successful) < total_items: 
     successful.extend([r for r in results if not r is None]) 
     successful.extend([r for r in retry_results if not r is None]) 
     failed_items = [] 
     while not q.empty(): 
      failed_items.append(q.get()) 
     if failed_items: 
      failure_tracker.append(failed_items) 
      retry_results = p.imap(f, failed_items); 
    p.close() 
    p.join() 

    print "Results: %s" % successful 
    print "Failures: %s" % failure_tracker 

if __name__ == '__main__': 
    main(range(1, 10)) 

Đầu ra là như thế này:

Results: [1, 4, 36, 49, 25, 81, 16, 64, 9] 
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []] 

Một không thể Pool được chia sẻ giữa nhiều quy trình. Do đó phương pháp tiếp cận dựa trên phương pháp này Queue. Nếu bạn cố gắng để vượt qua một hồ bơi như một tham số để các quá trình bơi, bạn sẽ nhận được lỗi này:

NotImplementedError: pool objects cannot be passed between processes or pickled 

Hoặc bạn có thể thử một vài lần thử lại ngay lập tức trong vòng chức năng của bạn f, để tránh đồng bộ trên cao. Nó thực sự là một vấn đề của chức năng của bạn sớm như thế nào nên chờ đợi để thử lại, và về khả năng một thành công là nếu thử lại ngay lập tức.


Cũ Trả lời:Vì lợi ích của sự hoàn chỉnh, đây là câu trả lời cũ của tôi, đó không phải là tối ưu như gửi lại trực tiếp vào hồ bơi, nhưng vẫn có thể có liên quan tùy thuộc vào trường hợp sử dụng , bởi vì nó cung cấp một cách tự nhiên để đối phó với/hạn n lần thử lại -level:

Bạn có thể sử dụng một Queue để thất bại tổng hợp và gửi lại vào cuối mỗi lần chạy, qua nhiều lần chạy:

import multiprocessing as mp 
import random 


def f(x): 
    if random.getrandbits(1): 
     # on failure/exception catch 
     f.q.put(x) 
     return None 
    return x*x 

def f_init(q): 
    f.q = q 

def main(pending): 
    run_number = 1 
    while pending: 
     jobs = pending 
     pending = [] 

     q = mp.Queue() 
     p = mp.Pool(None, f_init, [q]) 
     results = p.imap(f, jobs) 
     p.close() 

     p.join() 
     failed_items = [] 
     while not q.empty(): 
      failed_items.append(q.get()) 
     successful = [r for r in results if not r is None] 
     print "(%d) Succeeded: %s" % (run_number, successful) 
     print "(%d) Failed: %s" % (run_number, failed_items) 
     print 
     pending = failed_items 
     run_number += 1 

if __name__ == '__main__': 
    main(range(1, 10)) 

với sản lượng như thế này:

(1) Succeeded: [9, 16, 36, 81] 
(1) Failed: [2, 1, 5, 7, 8] 

(2) Succeeded: [64] 
(2) Failed: [2, 1, 5, 7] 

(3) Succeeded: [1, 25] 
(3) Failed: [2, 7] 

(4) Succeeded: [49] 
(4) Failed: [2] 

(5) Succeeded: [4] 
(5) Failed: [] 
+0

Đã cập nhật câu trả lời của tôi cho câu trả lời không yêu cầu nhiều lần chạy và giờ đây hoạt động trên cùng một nhóm ban đầu. –

+0

Cảm ơn bạn đã phản hồi chi tiết. Tôi thích ý tưởng đặt các tính toán không thành công trong hàng đợi để được thử lại. Tôi phải trao giải Andrew với tiền thưởng vì giải pháp của anh ta làm một thử lại đơn giản. – ash

+0

@ash Tôi đã đề cập đến ngay lập tức retries trong phản ứng của tôi, nghĩ rằng nó sẽ là một bổ sung nhỏ/đơn giản và không phải những gì bạn đang tìm kiếm. Cũng lưu ý rằng nó (retries ngay lập tức) không phải là tối ưu cho tất cả các trường hợp, đặc biệt là những người thử lại ngay lập tức có cơ hội thành công thấp (trong trường hợp đó là rất tối ưu vì nó gây ra nạn đói tài nguyên cho các công việc có khả năng thành công.) Chúc mừng Andrew dù sao. –

Các vấn đề liên quan