Tôi sử dụng thư viện đa xử lý python cho một thuật toán trong đó tôi có nhiều công nhân xử lý dữ liệu nhất định và trả về kết quả cho quá trình cha mẹ. Tôi sử dụng multiprocessing.Queue để chuyển công việc cho người lao động và thứ hai để thu thập kết quả.Python đa xử lý và xử lý ngoại lệ trong công nhân
Tất cả hoạt động khá tốt, cho đến khi nhân viên không xử lý được một số dữ liệu. Trong ví dụ đơn giản dưới đây mỗi công nhân có hai giai đoạn:
- khởi - có thể thất bại, trong việc này nhân viên phụ trách nên bị phá hủy
- xử lý dữ liệu - chế biến một đoạn dữ liệu có thể thất bại, trong lao động trường hợp này nên bỏ qua và tiếp tục với dữ liệu tiếp theo.
Khi một trong các giai đoạn này không thành công, tôi sẽ bị bế tắc sau khi hoàn thành tập lệnh. Mã này mô phỏng vấn đề của tôi:
import multiprocessing as mp
import random
workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3
#========= Worker =========
def do_work(job_state, arg):
if random.random() < fail_job_p:
raise Exception("Job failed")
return "job %d processed %d" % (job_state, arg)
def init(args):
if random.random() < fail_init_p:
raise Exception("Worker init failed")
return args
def worker_function(args, jobs_queue, result_queue):
# INIT
# What to do when init() fails?
try:
state = init(args)
except:
print "!Worker %d init fail" % args
return
# DO WORK
# Process data in the jobs queue
for job in iter(jobs_queue.get, None):
try:
# Can throw an exception!
result = do_work(state, job)
result_queue.put(result)
except:
print "!Job %d failed, skip..." % job
finally:
jobs_queue.task_done()
# Telling that we are done with processing stop token
jobs_queue.task_done()
#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
mp.Process(target=worker_function, args=(i, jobs, results)).start()
# Populate jobs queue
results_to_expect = 0
for j in range(30):
jobs.put(j)
results_to_expect += 1
# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
result = results.get()
print result
#Signal all workers to finish
for i in range(workers_count):
jobs.put(None)
#Wait for them to finish
jobs.join()
Tôi có hai câu hỏi về mã này:
- Khi
init()
thất bại, làm thế nào để phát hiện người lao động đó là không hợp lệ và không phải chờ đợi cho nó để kết thúc? - Khi
do_work()
không thành công, cách thông báo cho quy trình gốc mà kết quả sẽ ít hơn trong hàng đợi kết quả?
Cảm ơn bạn đã trợ giúp!
hoặc bạn có thể đặt một bộ dữ liệu '(kết quả, lỗi)' (lỗi là Không thành công) vào hàng đợi kết quả để tránh giao tiếp trong băng bị lỗi. – jfs