11

Tôi sử dụng thư viện đa xử lý python cho một thuật toán trong đó tôi có nhiều công nhân xử lý dữ liệu nhất định và trả về kết quả cho quá trình cha mẹ. Tôi sử dụng multiprocessing.Queue để chuyển công việc cho người lao động và thứ hai để thu thập kết quả.Python đa xử lý và xử lý ngoại lệ trong công nhân

Tất cả hoạt động khá tốt, cho đến khi nhân viên không xử lý được một số dữ liệu. Trong ví dụ đơn giản dưới đây mỗi công nhân có hai giai đoạn:

  • khởi - có thể thất bại, trong việc này nhân viên phụ trách nên bị phá hủy
  • xử lý dữ liệu - chế biến một đoạn dữ liệu có thể thất bại, trong lao động trường hợp này nên bỏ qua và tiếp tục với dữ liệu tiếp theo.

Khi một trong các giai đoạn này không thành công, tôi sẽ bị bế tắc sau khi hoàn thành tập lệnh. Mã này mô phỏng vấn đề của tôi:

import multiprocessing as mp 
import random 

workers_count = 5 
# Probability of failure, change to simulate failures 
fail_init_p = 0.2 
fail_job_p = 0.3 


#========= Worker ========= 
def do_work(job_state, arg): 
    if random.random() < fail_job_p: 
     raise Exception("Job failed") 
    return "job %d processed %d" % (job_state, arg) 

def init(args): 
    if random.random() < fail_init_p: 
     raise Exception("Worker init failed") 
    return args 

def worker_function(args, jobs_queue, result_queue): 
    # INIT 
    # What to do when init() fails? 
    try: 
     state = init(args) 
    except: 
     print "!Worker %d init fail" % args 
     return 
    # DO WORK 
    # Process data in the jobs queue 
    for job in iter(jobs_queue.get, None): 
     try: 
      # Can throw an exception! 
      result = do_work(state, job) 
      result_queue.put(result) 
     except: 
      print "!Job %d failed, skip..." % job 
     finally: 
      jobs_queue.task_done() 
    # Telling that we are done with processing stop token 
    jobs_queue.task_done() 



#========= Parent ========= 
jobs = mp.JoinableQueue() 
results = mp.Queue() 
for i in range(workers_count): 
    mp.Process(target=worker_function, args=(i, jobs, results)).start() 

# Populate jobs queue 
results_to_expect = 0 
for j in range(30): 
    jobs.put(j) 
    results_to_expect += 1 

# Collecting the results 
# What if some workers failed to process the job and we have 
# less results than expected 
for r in range(results_to_expect): 
    result = results.get() 
    print result 

#Signal all workers to finish 
for i in range(workers_count): 
    jobs.put(None) 

#Wait for them to finish 
jobs.join() 

Tôi có hai câu hỏi về mã này:

  1. Khi init() thất bại, làm thế nào để phát hiện người lao động đó là không hợp lệ và không phải chờ đợi cho nó để kết thúc?
  2. Khi do_work() không thành công, cách thông báo cho quy trình gốc mà kết quả sẽ ít hơn trong hàng đợi kết quả?

Cảm ơn bạn đã trợ giúp!

Trả lời

10

Tôi đã thay đổi mã của bạn một chút để làm cho nó hoạt động (xem giải thích bên dưới).

import multiprocessing as mp 
import random 

workers_count = 5 
# Probability of failure, change to simulate failures 
fail_init_p = 0.5 
fail_job_p = 0.4 


#========= Worker ========= 
def do_work(job_state, arg): 
    if random.random() < fail_job_p: 
     raise Exception("Job failed") 
    return "job %d processed %d" % (job_state, arg) 

def init(args): 
    if random.random() < fail_init_p: 
     raise Exception("Worker init failed") 
    return args 

def worker_function(args, jobs_queue, result_queue): 
    # INIT 
    # What to do when init() fails? 
    try: 
     state = init(args) 
    except: 
     print "!Worker %d init fail" % args 
     result_queue.put('init failed') 
     return 
    # DO WORK 
    # Process data in the jobs queue 
    for job in iter(jobs_queue.get, None): 
     try: 
      # Can throw an exception! 
      result = do_work(state, job) 
      result_queue.put(result) 
     except: 
      print "!Job %d failed, skip..." % job 
      result_queue.put('job failed') 


#========= Parent ========= 
jobs = mp.Queue() 
results = mp.Queue() 
for i in range(workers_count): 
    mp.Process(target=worker_function, args=(i, jobs, results)).start() 

# Populate jobs queue 
results_to_expect = 0 
for j in range(30): 
    jobs.put(j) 
    results_to_expect += 1 

init_failures = 0 
job_failures = 0 
successes = 0 
while job_failures + successes < 30 and init_failures < workers_count: 
    result = results.get() 
    init_failures += int(result == 'init failed') 
    job_failures += int(result == 'job failed') 
    successes += int(result != 'init failed' and result != 'job failed') 
    #print init_failures, job_failures, successes 

for ii in range(workers_count): 
    jobs.put(None) 

thay đổi của tôi:

  1. Changed jobs phải chỉ là một bình thường Queue (thay vì JoinableQueue).
  2. Công nhân giờ đây giao tiếp lại các chuỗi kết quả đặc biệt "init failed" và "job failed".
  3. Quy trình tổng thể giám sát các kết quả đặc biệt cho biết miễn là các điều kiện cụ thể có hiệu lực.
  4. Cuối cùng, hãy đặt các yêu cầu "dừng" (tức là None công việc) cho tuy nhiên nhiều công nhân bạn có, bất kể. Lưu ý rằng không phải tất cả những điều này có thể được kéo ra khỏi hàng đợi (trong trường hợp người lao động không thành công).

Nhân tiện, mã ban đầu của bạn thật đẹp và dễ làm việc. Các bit xác suất ngẫu nhiên là khá mát mẻ.

+2

hoặc bạn có thể đặt một bộ dữ liệu '(kết quả, lỗi)' (lỗi là Không thành công) vào hàng đợi kết quả để tránh giao tiếp trong băng bị lỗi. – jfs