2011-07-18 22 views
52

Dường như khi một ngoại lệ được nâng lên từ quá trình đa xử lý.Đơn vị, không có dấu vết ngăn xếp hoặc bất kỳ dấu hiệu nào khác cho thấy nó đã thất bại. Ví dụ:Ngoại lệ được ném trong đa xử lý Pool không được phát hiện

from multiprocessing import Pool 

def go(): 
    print(1) 
    raise Exception() 
    print(2) 

p = Pool() 
p.apply_async(go) 
p.close() 
p.join() 

in 1 và dừng âm thầm. Thật thú vị, nâng cao BaseException thay vì hoạt động. Có cách nào để thực hiện các hành vi cho tất cả các trường hợp ngoại lệ giống như BaseException?

+1

Tôi đã gặp vấn đề tương tự. Nguyên nhân là như sau: quy trình công nhân bắt ngoại lệ và đặt một mã lỗi và ngoại lệ trên hàng đợi kết quả. Quay trở lại quá trình chính, luồng xử lý kết quả của Pool nhận mã lỗi và chỉ bỏ qua nó. Một số loại chế độ gỡ lỗi khỉ vá có thể là có thể. Một giải pháp thay thế sẽ là đảm bảo chức năng công nhân của bạn bắt bất kỳ ngoại lệ nào, trả về nó và một mã lỗi cho trình xử lý của bạn để in. –

+0

Điều này đã được trả lời ở đây: http://stackoverflow.com/a/26096355/512111 – j08lue

Trả lời

23

Tôi có một giải pháp hợp lý cho vấn đề này, ít nhất là cho mục đích gỡ lỗi. Tôi hiện không có giải pháp nào sẽ làm tăng ngoại lệ trở lại trong các quy trình chính. Suy nghĩ đầu tiên của tôi là sử dụng một trang trí, nhưng bạn chỉ có thể chọn functions defined at the top level of a module, vì vậy điều đó đúng.

Thay vào đó, một lớp gói đơn giản và một lớp con Hồ bơi sử dụng lớp này cho apply_async (và do đó apply). Tôi sẽ để lại map_async làm bài tập cho người đọc.

import traceback 
from multiprocessing.pool import Pool 
import multiprocessing 

# Shortcut to multiprocessing's logger 
def error(msg, *args): 
    return multiprocessing.get_logger().error(msg, *args) 

class LogExceptions(object): 
    def __init__(self, callable): 
     self.__callable = callable 

    def __call__(self, *args, **kwargs): 
     try: 
      result = self.__callable(*args, **kwargs) 

     except Exception as e: 
      # Here we add some debugging help. If multiprocessing's 
      # debugging is on, it will arrange to log the traceback 
      error(traceback.format_exc()) 
      # Re-raise the original exception so the Pool worker can 
      # clean up 
      raise 

     # It was fine, give a normal answer 
     return result 

class LoggingPool(Pool): 
    def apply_async(self, func, args=(), kwds={}, callback=None): 
     return Pool.apply_async(self, LogExceptions(func), args, kwds, callback) 

def go(): 
    print(1) 
    raise Exception() 
    print(2) 

multiprocessing.log_to_stderr() 
p = LoggingPool(processes=1) 

p.apply_async(go) 
p.close() 
p.join() 

này mang lại cho tôi:

1 
[ERROR/PoolWorker-1] Traceback (most recent call last): 
    File "mpdebug.py", line 24, in __call__ 
    result = self.__callable(*args, **kwargs) 
    File "mpdebug.py", line 44, in go 
    raise Exception() 
Exception 
+0

Nó quá xấu không có một giải pháp đơn giản (hoặc một sai lầm về phía tôi) nhưng điều này sẽ giúp bạn hoàn thành công việc- cảm ơn! –

+4

Tôi đã nhận ra rằng trang trí có thể được sử dụng, nếu bạn sử dụng '@ functools.wraps (func)' để trang trí trình bao bọc của bạn. Điều này làm cho chức năng trang trí của bạn trông giống như một hàm được xác định ở cấp cao nhất của một mô-đun. –

+1

Giải pháp trong [câu trả lời này] (http://stackoverflow.com/a/26096355/512111) là đơn giản hơn ** và ** hỗ trợ tái tăng lỗi trong quá trình chính! – j08lue

0

Tôi muốn thử sử dụng pdb:

import pdb 
import sys 
def handler(type, value, tb): 
    pdb.pm() 
sys.excepthook = handler 
+0

Nó không bao giờ đạt được xử lý trong trường hợp đó, lạ –

42

Có lẽ tôi đang thiếu cái gì, nhưng không phải là những gì các phương pháp get của lợi nhuận đối tượng quả? Xem Process Pools.

lớp multiprocessing.pool.AsyncResult

Lớp kết quả được trả bởi Pool.apply_async() và Pool.map_async(). Get ([timeout])
Return kết quả khi nó đến. Nếu thời gian chờ không phải là Không và kết quả sẽ không đến trong vòng giây hết thời gian chờ sau đó đa xử lý.TimeoutError được nâng lên. Nếu cuộc gọi từ xa tăng lên ngoại lệ thì ngoại lệ đó sẽ bị reraised bởi get().

Vì vậy, một chút thay đổi ví dụ của bạn, người ta có thể làm

from multiprocessing import Pool 

def go(): 
    print(1) 
    raise Exception("foobar") 
    print(2) 

p = Pool() 
x = p.apply_async(go) 
x.get() 
p.close() 
p.join() 

Mà cho kết quả

1 
Traceback (most recent call last): 
    File "rob.py", line 10, in <module> 
    x.get() 
    File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get 
    raise self._value 
Exception: foobar 

Đây không phải là hoàn toàn thỏa đáng, vì nó không in traceback, nhưng là có còn hơn không.

CẬP NHẬT: Lỗi này đã được sửa trong Python 3.4, do Richard Oudkerk cung cấp. Xem sự cố get method of multiprocessing.pool.Async should return full traceback.

+0

Đó là hữu ích, cảm ơn. Tôi đã không thấy điều đó. –

+0

Hãy cho tôi biết nếu bạn tìm hiểu lý do tại sao nó không trả lại dấu vết. Vì nó có thể trả về giá trị lỗi, nên nó cũng có thể trả về dấu vết. Tôi có thể hỏi về một số diễn đàn phù hợp - có lẽ một số danh sách phát triển Python. BTW, như bạn có thể đoán, tôi đã xem qua câu hỏi của bạn trong khi cố gắng tìm ra điều tương tự. :-) –

+4

Lưu ý: để làm điều này cho một loạt các tác vụ chạy đồng thời, bạn nên lưu tất cả các kết quả vào danh sách, sau đó lặp qua từng kết quả với get(), có thể được bao quanh bởi try/catch nếu bạn không muốn crap ra trên các lỗi đầu tiên. – dfrankow

1

Tôi đã tạo một mô-đun RemoteException.py hiển thị toàn bộ dấu vết của một ngoại lệ trong một quy trình. Python2.Download it và thêm video này vào mã của bạn:

import RemoteException 

@RemoteException.showError 
def go(): 
    raise Exception('Error!') 

if __name__ == '__main__': 
    import multiprocessing 
    p = multiprocessing.Pool(processes = 1) 
    r = p.apply(go) # full traceback is shown here 
7

Tôi đã có trường hợp ngoại lệ khai thác gỗ thành công với trang trí này:

import traceback, functools, multiprocessing 

def trace_unhandled_exceptions(func): 
    @functools.wraps(func) 
    def wrapped_func(*args, **kwargs): 
     try: 
      func(*args, **kwargs) 
     except: 
      print 'Exception in '+func.__name__ 
      traceback.print_exc() 
    return wrapped_func 

với mã trong câu hỏi, đó là

@trace_unhandled_exceptions 
def go(): 
    print(1) 
    raise Exception() 
    print(2) 

p = multiprocessing.Pool(1) 

p.apply_async(go) 
p.close() 
p.join() 

Đơn giản chỉ cần trang trí chức năng mà bạn chuyển đến nhóm xử lý của mình. Chìa khóa để làm việc này là @functools.wraps(func) nếu không đa xử lý sẽ ném một số PicklingError.

mã trên cho

1 
Exception in go 
Traceback (most recent call last): 
    File "<stdin>", line 5, in wrapped_func 
    File "<stdin>", line 4, in go 
Exception 
+0

Điều này không hoạt động nếu chức năng đang chạy song song - go() trong trường hợp này - trả về một giá trị. Người trang trí không vượt qua giá trị trả lại thông qua. Khác hơn là tôi thích giải pháp này. – MD004

+0

Ví đi qua giá trị trả về chỉ thay đổi wrapper_func như thế này: 'def wrapped_func (* args, ** kwargs): result = Không thử: result = func (* args, ** kwargs) trừ: in ('Ngoại lệ trong' + func .__ name__) traceback.print_exc() kết quả trả về ' Hoạt động như sự quyến rũ;) – MoTSCHIGGE

16

Các giải pháp với số phiếu cao nhất tại thời điểm viết bài có một vấn đề:

from multiprocessing import Pool 

def go(): 
    print(1) 
    raise Exception("foobar") 
    print(2) 

p = Pool() 
x = p.apply_async(go) 
x.get() ## waiting here for go() to complete... 
p.close() 
p.join() 

Như @dfrankow ghi chú khác, nó sẽ đợi trên x.get(), mà làm hỏng điểm của việc chạy một nhiệm vụ không đồng bộ. Vì vậy, cho hiệu quả tốt hơn (đặc biệt nếu chức năng lao động của bạn go mất một thời gian dài) tôi sẽ thay đổi nó thành:

from multiprocessing import Pool 

def go(x): 
    print(1) 
    # task_that_takes_a_long_time() 
    raise Exception("Can't go anywhere.") 
    print(2) 
    return x**2 

p = Pool() 
results = [] 
for x in range(1000): 
    results.append(p.apply_async(go, [x])) 

p.close() 

for r in results: 
    r.get() 

Ưu: chức năng lao động được điều hành không đồng bộ, vì vậy nếu ví dụ bạn đang chạy nhiều nhiệm vụ trên một số lõi, nó sẽ hiệu quả hơn rất nhiều so với giải pháp ban đầu.

Nhược: nếu có một ngoại lệ trong hàm công nhân, nó sẽ chỉ được nâng lên sau hồ bơi đã hoàn thành tất cả các nhiệm vụ. Điều này có thể hoặc không thể là hành vi mong muốn. CHỈNH SỬA theo nhận xét của @ colinfang, đã sửa lỗi này.

+0

Nỗ lực tốt. Tuy nhiên, vì ví dụ của bạn được xác định dựa trên giả định rằng có nhiều kết quả, có thể mở rộng nó một chút để có, trên thực tế, nhiều kết quả? Ngoài ra, bạn viết: "đặc biệt nếu bạn làm việc chức năng". Đó nên là "của bạn". –

+0

Bạn nói đúng, cảm ơn. Tôi đã mở rộng ví dụ một chút. – gozzilli

+1

Tuyệt. Ngoài ra, bạn có thể muốn thử/ngoại trừ, tùy thuộc vào cách bạn muốn chịu đựng lỗi trong quá trình tìm nạp. – dfrankow

0

Vì bạn đã sử dụng apply_sync, tôi đoán trường hợp sử dụng là muốn thực hiện một số tác vụ đồng bộ hóa. Sử dụng gọi lại để xử lý là một tùy chọn khác. Xin lưu ý tùy chọn này chỉ có sẵn cho python3.2 trở lên và không có sẵn trên python2.7.

from multiprocessing import Pool 

def callback(result): 
    print('success', result) 

def callback_error(result): 
    print('error', result) 

def go(): 
    print(1) 
    raise Exception() 
    print(2) 

p = Pool() 
p.apply_async(go, callback=callback, error_callback=callback_error) 

# You can do another things 

p.close() 
p.join() 
+0

không có 'error_callbak' như vậy cho phương thức' apply_async', hãy tham khảo https://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool .apply_async – Sanju

+1

cho phiên bản sau: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.apply_async – Asoul

1
import logging 
from multiprocessing import Pool 

def proc_wrapper(func, *args, **kwargs): 
    """Print exception because multiprocessing lib doesn't return them right.""" 
    try: 
     return func(*args, **kwargs) 
    except Exception as e: 
     logging.exception(e) 
     raise 

def go(x): 
    print x 
    raise Exception("foobar") 

p = Pool() 
p.apply_async(proc_wrapper, (go, 5)) 
p.join() 
p.close() 
0

Do đã có câu trả lời đàng hoàng cho multiprocessing.Pool sẵn, tôi sẽ cung cấp một giải pháp sử dụng một cách tiếp cận khác nhau cho đầy đủ.

Đối python >= 3.2 các giải pháp sau đây có vẻ là đơn giản nhất:

from concurrent.futures import ProcessPoolExecutor, wait 

def go(): 
    print(1) 
    raise Exception() 
    print(2) 


futures = [] 
with ProcessPoolExecutor() as p: 
    for i in range(10): 
     futures.append(p.submit(go)) 

results = [f.result() for f in futures] 

Ưu điểm:

  • rất ít mã
  • đặt ra một ngoại lệ trong quá trình chính
  • cung cấp một stack trace
  • không có phụ thuộc bên ngoài

Đối với thông tin thêm về API xin vui lòng kiểm tra: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor

Thêm vào đó, nếu bạn đang gửi một số lượng lớn các nhiệm vụ và bạn muốn quá trình chính của bạn thất bại trong thời gian sớm là một trong những nhiệm vụ của bạn thất bại, bạn có thể sử dụng đoạn mã sau:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed 
import time 


def go(): 
    print(1) 
    time.sleep(0.3) 
    raise Exception() 
    print(2) 


futures = [] 
with ProcessPoolExecutor(1) as p: 
    for i in range(10): 
     futures.append(p.submit(go)) 

    for f in as_completed(futures): 
     if f.exception() is not None: 
      for f in futures: 
       f.cancel() 
      break 

[f.result() for f in futures] 

Tất cả các câu trả lời khác không chỉ một khi tất cả các tác vụ được thực hiện.

Các vấn đề liên quan