2015-02-06 16 views
8

Tôi có chức năng thực hiện một số mô phỏng và trả về một mảng ở định dạng chuỗi.Đa xử lý Python - theo dõi quá trình hoạt động pool.map

Tôi muốn chạy mô phỏng (hàm) cho giá trị tham số đầu vào khác nhau, hơn 10000 giá trị đầu vào có thể, và ghi kết quả vào một tệp.

Tôi đang sử dụng đa xử lý, cụ thể là chức năng pool.map để chạy mô phỏng song song.

Vì toàn bộ quá trình chạy chức năng mô phỏng trên 10000 lần mất một thời gian rất dài, tôi thực sự muốn theo dõi quá trình của toàn bộ thao tác.

Tôi nghĩ rằng sự cố trong mã hiện tại của tôi bên dưới là, pool.map chạy hàm 10000 lần mà không cần theo dõi quá trình trong các hoạt động đó. Khi xử lý song song kết thúc chạy 10000 mô phỏng (có thể là hàng giờ.), Sau đó tôi tiếp tục theo dõi khi 10000 kết quả mô phỏng đang được lưu vào một tệp..Điều này không thực sự theo dõi việc xử lý hoạt động pool.map.

Có cách khắc phục dễ dàng mã của tôi sẽ cho phép theo dõi quy trình không?

def simFunction(input): 
    # Does some simulation and outputs simResult 
    return str(simResult) 

# Parallel processing 

inputs = np.arange(0,10000,1) 

if __name__ == "__main__": 
    numCores = multiprocessing.cpu_count() 
    pool = multiprocessing.Pool(processes = numCores) 
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out: 
     print("Starting to simulate " + str(len(inputs)) + " input values...") 
     counter = 0 
     for i in t: 
      out.write(i + '\n') 
      counter = counter + 1 
      if counter%100==0: 
       print(str(counter) + " of " + str(len(inputs)) + " input values simulated") 
    print('Finished!!!!') 

Trả lời

7

Nếu bạn sử dụng chức năng map được lặp lại, bạn có thể dễ dàng theo dõi tiến độ.

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> def simFunction(x,y): 
... import time 
... time.sleep(2) 
... return x**2 + y 
... 
>>> x,y = range(100),range(-100,100,2) 
>>> res = Pool().imap(simFunction, x,y) 
>>> with open('results.txt', 'w') as out: 
... for i in x: 
...  out.write("%s\n" % res.next()) 
...  if i%10 is 0: 
...  print "%s of %s simulated" % (i, len(x)) 
... 
0 of 100 simulated 
10 of 100 simulated 
20 of 100 simulated 
30 of 100 simulated 
40 of 100 simulated 
50 of 100 simulated 
60 of 100 simulated 
70 of 100 simulated 
80 of 100 simulated 
90 of 100 simulated 

Hoặc bạn có thể sử dụng không đồng bộ map. Ở đây tôi sẽ làm những việc khác một chút, chỉ để trộn nó lên.

>>> import time 
>>> res = Pool().amap(simFunction, x,y) 
>>> while not res.ready(): 
... print "waiting..." 
... time.sleep(5) 
... 
waiting... 
waiting... 
waiting... 
waiting... 
>>> res.get() 
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899] 

Lưu ý rằng tôi đang sử dụng pathos.multiprocessing thay vì multiprocessing. Nó chỉ là một ngã ba của multiprocessing cho phép bạn thực hiện các chức năng map với nhiều đầu vào, có tuần tự hóa tốt hơn nhiều và cho phép bạn thực hiện các cuộc gọi map ở bất kỳ đâu (không chỉ trong __main__). Bạn cũng có thể sử dụng multiprocessing để thực hiện điều này, tuy nhiên mã sẽ hơi khác một chút.

Hoặc là map được lặp lại hoặc không đồng bộ sẽ cho phép bạn viết bất kỳ mã nào bạn muốn thực hiện theo dõi quy trình tốt hơn.Ví dụ: chuyển một "id" duy nhất cho từng công việc và xem quay lại hoặc có mỗi công việc trả về id quá trình của nó. Có rất nhiều cách để theo dõi tiến độ và quy trình… nhưng ở trên sẽ cho bạn một sự khởi đầu.

Bạn có thể nhận pathos đây: https://github.com/uqfoundation

+0

cảm ơn bạn rất nhiều! – user32147

3

Không có "sửa lỗi dễ dàng". map là tất cả về việc ẩn chi tiết triển khai từ bạn. Và trong trường hợp này, bạn cần muốn chi tiết. Đó là, mọi thứ trở nên phức tạp hơn một chút, theo định nghĩa. Bạn cần phải thay đổi mô hình truyền thông. Có nhiều cách để làm như vậy.

Một là: tạo Hàng đợi để thu thập kết quả của bạn và để nhân viên của bạn đưa kết quả vào hàng đợi này. Sau đó, bạn có thể, từ bên trong một luồng hoặc quy trình giám sát, nhìn vào hàng đợi và tiêu thụ kết quả khi chúng đang đến. Trong khi tiêu thụ, bạn có thể phân tích chúng và tạo đầu ra bản ghi. Đây có thể là cách tổng quát nhất để theo dõi tiến trình: bạn có thể trả lời các kết quả đến theo bất kỳ cách nào, theo thời gian thực.

Cách đơn giản hơn có thể là sửa đổi một chút chức năng công nhân của bạn và tạo đầu ra nhật ký trong đó. Bằng cách phân tích cẩn thận đầu ra nhật ký bằng các công cụ bên ngoài (chẳng hạn như grepwc), bạn có thể tìm ra phương tiện theo dõi rất đơn giản.

+1

cảm ơn bạn. bạn có thể cung cấp một số ví dụ đơn giản không? – user32147

3

Tôi nghĩ rằng những gì bạn cần là một tập tin log .

Tôi khuyên bạn nên sử dụng mô-đun ghi nhật ký là một phần của thư viện chuẩn Python. Nhưng tiếc là ghi nhật ký không phải là đa xử lý an toàn. Vì vậy, bạn không thể sử dụng nó out-of-the-box trong ứng dụng của bạn.

Vì vậy, bạn sẽ cần sử dụng trình xử lý nhật ký đa xử lý an toàn hoặc triển khai trình xử lý của bạn bằng Hàng đợi hoặc khóa cùng với mô-đun ghi nhật ký.

Có rất nhiều cuộc thảo luận về điều này trong Stackoverflow. Đây ví dụ: How should I log while using multiprocessing in Python?

Nếu hầu hết các CPU đang trong hàm mô phỏng và bạn sẽ không phải sử dụng luân phiên đăng nhập, bạn có thể có thể sử dụng một cơ chế khóa đơn giản như thế này:

import multiprocessing 
import logging 

from random import random 
import time 


logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s %(process)s %(levelname)s %(message)s', 
    filename='results.log', 
    filemode='a' 
) 


def simulation(a): 
    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Simulating with %s" % a) 

    # simulation 
    time.sleep(random()) 
    result = a*2 

    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Finished simulation with %s. Result is %s" % (a, result)) 

    return result 

if __name__ == '__main__': 

    logging.debug("Starting the simulation") 
    inputs = [x for x in xrange(100)] 
    num_cores = multiprocessing.cpu_count() 
    print "num_cores: %d" % num_cores 
    pool = multiprocessing.Pool(processes=num_cores) 
    t = pool.map(simulation, inputs) 
    logging.debug("The simulation has ended") 

Bạn có thể "tail -f" tệp nhật ký của bạn khi chạy. Đây là những gì bạn sẽ thấy:

2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation 
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12 
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28 
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20 
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16 
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8 
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4 
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24 
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0 
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24 
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13 
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16 
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9 
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48 
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25 
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50 
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26 
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26 
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14 
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28 
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15 
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8 
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5 

Đã thử trên Windows và Linux.

Hy vọng điều này sẽ giúp

+0

'multiprocessing.get_logger()' trả về trình ghi nhật ký giới hạn tính năng được bảo vệ bằng khóa, xem https://docs.python.org/2/library/multiprocessing.html#logging –

+0

Có, nhưng đây là trình ghi nhật ký mô-đun ... Vì vậy, bạn có thể sử dụng nó, nhật ký của bạn sẽ được trộn với các thông báo ở mức mô-đun: Hãy thử nó và bạn sẽ thấy các thông báo như sau: 2015-02-08 23: 47: 10,954 9288 DEBUG tạo khóa liên kết với tay cầm 448 –

+0

Ồ, bạn đúng, tôi chưa bao giờ sử dụng nó thực sự và lướt qua các tài liệu quá nhanh. –

Các vấn đề liên quan