2017-08-30 23 views
16

Đây là chương trình số nguyên tố chính của tôi, tôi đã thêm một hàm gọi lại trong pool.apply_async(findK, args=(N,begin,end)), một thông báo nhắc ra prime factorization is over khi hệ số hóa kết thúc, nó hoạt động tốt.Làm thế nào để vượt qua trường hợp đa xử lý.Pool để apply_async chức năng gọi lại?

import math 
import multiprocessing 

def findK(N,begin,end): 
    for k in range(begin,end): 
     if N% k == 0: 
      print(N,"=" ,k ,"*", N/k) 
      return True 
    return False 


def prompt(result): 
    if result: 
     print("prime factorization is over") 


def mainFun(N,process_num): 
    pool = multiprocessing.Pool(process_num) 
    for i in range(process_num): 
     if i ==0 : 
      begin =2 
     else: 
      begin = int(math.sqrt(N)/process_num*i)+1 
     end = int(math.sqrt(N)/process_num*(i+1)) 
     pool.apply_async(findK, args=(N,begin,end) , callback = prompt)  
    pool.close() 
    pool.join()  

if __name__ == "__main__": 
    N = 684568031001583853 
    process_num = 16 
    mainFun(N,process_num) 

Bây giờ tôi muốn thay đổi chức năng gọi lại trong apply_async, để thay đổi lời nhắc thành chức năng tắt máy để tiêu diệt tất cả các quy trình khác.

def prompt(result): 
    if result: 
     pool.terminate() 

Trường hợp hồ bơi không được xác định trong phạm vi nhanh hoặc được chuyển vào lời nhắc.
pool.terminate() không thể hoạt động trong chức năng nhắc.
Làm thế nào để vượt qua đa xử lý.Đối tượng dụ để apply_async'callback chức năng?
(Tôi đã thực hiện nó được thực hiện ở định dạng lớp, chỉ cần thêm một phương thức lớp và gọi self.pool.terminate có thể giết chết tất cả các quá trình khác, làm thế nào để thực hiện công việc ở định dạng chức năng?)

nếu không thiết lập hồ bơi như biến toàn cầu, có thể pool được chuyển vào hàm callback?

Trả lời

7

Passing các đối số thừa cho hàm gọi lại không được hỗ trợ. Tuy nhiên, bạn có rất nhiều cách thanh lịch để giải quyết vấn đề đó.

Bạn có thể đóng gói logic hồ bơi của bạn thành một đối tượng:

class Executor: 
    def __init__(self, process_num): 
     self.pool = multiprocessing.Pool(process_num) 

    def prompt(self, result): 
     if result: 
      print("prime factorization is over") 
      self.pool.terminate() 

    def schedule(self, function, args): 
     self.pool.apply_async(function, args=args, callback=self.prompt) 

    def wait(self): 
     self.pool.close() 
     self.pool.join() 


def main(N,process_num): 
    executor = Executor(process_num) 
    for i in range(process_num): 
     ... 
     executor.schedule(findK, (N,begin,end)) 
    executor.wait() 

Hoặc bạn có thể sử dụng việc thực hiện concurrent.futures.Executor mà trả về một đối tượng Future. Bạn chỉ cần thêm hồ bơi vào đối tượng Future trước khi thiết lập gọi lại.

def prompt(future): 
    if future.result(): 
     print("prime factorization is over") 
     future.pool_executor.shutdown(wait=False) 

def main(N,process_num): 
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=process_num) 
    for i in range(process_num): 
     ... 
     future = executor.submit(findK, N,begin,end) 
     future.pool_executor = executor 
     future.add_done_callback(prompt) 
4

Bạn cần có pool kết thúc trong môi trường prompt. Một khả năng là di chuyển pool vào phạm vi toàn cầu (mặc dù điều này không thực sự là thực hành tốt nhất). Điều này dường như làm việc:

import math 
import multiprocessing 

pool = None 

def findK(N,begin,end): 
    for k in range(begin,end): 
     if N% k == 0: 
      print(N,"=" ,k ,"*", N/k) 
      return True 
    return False 


def prompt(result): 
    if result: 
     print("prime factorization is over") 
     pool.terminate() 


def mainFun(N,process_num): 
    global pool 
    pool = multiprocessing.Pool(process_num) 
    for i in range(process_num): 
     if i ==0 : 
      begin =2 
     else: 
      begin = int(math.sqrt(N)/process_num*i)+1 
     end = int(math.sqrt(N)/process_num*(i+1)) 
     pool.apply_async(findK, args=(N,begin,end) , callback = prompt)  
    pool.close() 
    pool.join()  

if __name__ == "__main__": 
    N = 684568031001583853 
    process_num = 16 
    mainFun(N,process_num) 
+0

nếu không đặt nhóm làm biến toàn cầu, nhóm có thể được chuyển vào hàm gọi lại không? –

4

Bạn chỉ có thể định nghĩa một hàm cục bộ close như một callback:

import math 
import multiprocessing 


def findK(N, begin, end): 
    for k in range(begin, end): 
     if N % k == 0: 
      print(N, "=", k, "*", N/k) 
      return True 
    return False 


def mainFun(N, process_num): 
    pool = multiprocessing.Pool(process_num) 

    def close(result): 
     if result: 
      print("prime factorization is over") 
      pool.terminate() 
    for i in range(process_num): 
     if i == 0: 
      begin = 2 
     else: 
      begin = int(math.sqrt(N)/process_num * i) + 1 
     end = int(math.sqrt(N)/process_num * (i + 1)) 
     pool.apply_async(findK, args=(N, begin, end), callback=close) 
    pool.close() 
    pool.join() 


if __name__ == "__main__": 
    N = 684568031001583853 
    process_num = 16 
    mainFun(N, process_num) 

Bạn cũng có thể sử dụng một hàm partial từ functool, với

import functools 

def close_pool(pool, results): 
    if result: 
     pool.terminate() 

def mainFun(N, process_num): 
    pool = multiprocessing.Pool(process_num) 

    close = funtools.partial(close_pool, pool) 
.... 
Các vấn đề liên quan