6

Gần đây tôi đã bắt đầu học 0MQ. Trước đó ngày hôm nay, tôi đã chạy vào một blog, Python Multiprocessing with ZeroMQ. Nó đã nói về the ventilator pattern trong Hướng dẫn 0MQ mà tôi đã đọc, vì vậy tôi đã quyết định thử.Tại sao tập lệnh Python 0MQ này cho máy tính phân tán bị treo ở kích thước đầu vào cố định?

Thay vì chỉ tính số sản phẩm của các công nhân làm mã gốc, tôi quyết định cố gắng làm cho máy thở gửi các mảng lớn tới công nhân qua tin nhắn 0mq. Sau đây là mã mà tôi đã sử dụng cho "thử nghiệm của mình".

Như đã lưu ý trong nhận xét bên dưới, bất kỳ lúc nào tôi cố gắng tăng biến string_length thành số lớn hơn 3MB, mã bị treo.

Triệu chứng điển hình: giả sử chúng tôi đặt string_length thành 4MB (tức là 4194304), có thể là người quản lý kết quả nhận được kết quả từ một công nhân và sau đó mã tạm dừng. htop cho thấy 2 lõi không làm nhiều. Trình giám sát lưu lượng mạng Etherape cho thấy không có lưu lượng trên giao diện lo.

Cho đến giờ, sau nhiều giờ nhìn xung quanh, tôi không thể tìm ra nguyên nhân gây ra điều này và sẽ đánh giá cao một hoặc hai gợi ý về lý do và giải pháp về vấn đề này. Cảm ơn!

Tôi đang chạy Ubuntu 11.04 64bit trên máy tính xách tay Dell với CPU Intel Core do, RAM 8GB, SSD Intel X25MG2 80 GB, Python 2.7.1+, libzmq1 2.1.10-1chl1 ~ natty1, python-pyzmq 2.1.10- 1chl1 ~ natty1

import time 
import zmq 
from multiprocessing import Process, cpu_count 

np = cpu_count() 
pool_size = np 
number_of_elements = 128 
# Odd, why once the slen is bumped to 3MB or above, the code hangs? 
string_length = 1024 * 1024 * 3 

def create_inputs(nelem, slen, pb=True): 
    ''' 
    Generates an array that contains nelem fix-sized (of slen bytes) 
    random strings and an accompanying array of hexdigests of the 
    former's elements. Both are returned in a tuple. 

    :type nelem: int 
    :param nelem: The desired number of elements in the to be generated 
        array. 
    :type slen: int 
    :param slen: The desired number of bytes of each array element. 
    :type pb: bool 
    :param pb: If True, displays a text progress bar during input array 
       generation. 
    ''' 
    from os import urandom 
    import sys 
    import hashlib 

    if pb: 
     if nelem <= 64: 
      toolbar_width = nelem 
      chunk_size = 1 
     else: 
      toolbar_width = 64 
      chunk_size = nelem // toolbar_width 
     description = '%d random strings of %d bytes. ' % (nelem, slen) 
     s = ''.join(('Generating an array of ', description, '...\n')) 
     sys.stdout.write(s) 
     # create an ASCII progress bar 
     sys.stdout.write("[%s]" % (" " * toolbar_width)) 
     sys.stdout.flush() 
     sys.stdout.write("\b" * (toolbar_width+1)) 
    array = list() 
    hash4a = list() 
    try: 
     for i in range(nelem): 
      e = urandom(int(slen)) 
      array.append(e) 
      h = hashlib.md5() 
      h.update(e) 
      he = h.hexdigest() 
      hash4a.append(he) 
      i += 1 
      if pb and i and i % chunk_size == 0: 
       sys.stdout.write("-") 
       sys.stdout.flush() 
     if pb: 
      sys.stdout.write("\n") 
    except MemoryError: 
     print('Memory Error: discarding existing arrays') 
     array = list() 
     hash4a = list() 
    finally: 
     return array, hash4a 

# The "ventilator" function generates an array of nelem fix-sized (of slen 
# bytes long) random strings, and sends the array down a zeromq "PUSH" 
# connection to be processed by listening workers, in a round robin load 
# balanced fashion. 

def ventilator(): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to send work 
    ventilator_send = context.socket(zmq.PUSH) 
    ventilator_send.bind("tcp://127.0.0.1:5557") 

    # Give everything a second to spin up and connect 
    time.sleep(1) 

    # Create the input array 
    nelem = number_of_elements 
    slen = string_length 
    payloads = create_inputs(nelem, slen) 

    # Send an array to each worker 
    for num in range(np): 
     work_message = { 'num' : payloads } 
     ventilator_send.send_pyobj(work_message) 

    time.sleep(1) 

# The "worker" functions listen on a zeromq PULL connection for "work" 
# (array to be processed) from the ventilator, get the length of the array 
# and send the results down another zeromq PUSH connection to the results 
# manager. 

def worker(wrk_num): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to receive work from the ventilator 
    work_receiver = context.socket(zmq.PULL) 
    work_receiver.connect("tcp://127.0.0.1:5557") 

    # Set up a channel to send result of work to the results reporter 
    results_sender = context.socket(zmq.PUSH) 
    results_sender.connect("tcp://127.0.0.1:5558") 

    # Set up a channel to receive control messages over 
    control_receiver = context.socket(zmq.SUB) 
    control_receiver.connect("tcp://127.0.0.1:5559") 
    control_receiver.setsockopt(zmq.SUBSCRIBE, "") 

    # Set up a poller to multiplex the work receiver and control receiver channels 
    poller = zmq.Poller() 
    poller.register(work_receiver, zmq.POLLIN) 
    poller.register(control_receiver, zmq.POLLIN) 

    # Loop and accept messages from both channels, acting accordingly 
    while True: 
     socks = dict(poller.poll()) 

     # If the message came from work_receiver channel, get the length 
     # of the array and send the answer to the results reporter 
     if socks.get(work_receiver) == zmq.POLLIN: 
      #work_message = work_receiver.recv_json() 
      work_message = work_receiver.recv_pyobj() 
      length = len(work_message['num'][0]) 
      answer_message = { 'worker' : wrk_num, 'result' : length } 
      results_sender.send_json(answer_message) 

     # If the message came over the control channel, shut down the worker. 
     if socks.get(control_receiver) == zmq.POLLIN: 
      control_message = control_receiver.recv() 
      if control_message == "FINISHED": 
       print("Worker %i received FINSHED, quitting!" % wrk_num) 
       break 

# The "results_manager" function receives each result from multiple workers, 
# and prints those results. When all results have been received, it signals 
# the worker processes to shut down. 

def result_manager(): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to receive results 
    results_receiver = context.socket(zmq.PULL) 
    results_receiver.bind("tcp://127.0.0.1:5558") 

    # Set up a channel to send control commands 
    control_sender = context.socket(zmq.PUB) 
    control_sender.bind("tcp://127.0.0.1:5559") 

    for task_nbr in range(np): 
     result_message = results_receiver.recv_json() 
     print "Worker %i answered: %i" % (result_message['worker'], result_message['result']) 

    # Signal to all workers that we are finsihed 
    control_sender.send("FINISHED") 
    time.sleep(5) 

if __name__ == "__main__": 

    # Create a pool of workers to distribute work to 
    for wrk_num in range(pool_size): 
     Process(target=worker, args=(wrk_num,)).start() 

    # Fire up our result manager... 
    result_manager = Process(target=result_manager, args=()) 
    result_manager.start() 

    # Start the ventilator! 
    ventilator = Process(target=ventilator, args=()) 
    ventilator.start() 
+0

tôi đã làm thí nghiệm hơn: giảm number_of_elements đến 64 và tăng STRING_LENGTH đến 6. Mã này vẫn chạy tốt. Trên đó, cùng một triệu chứng xuất hiện. Điều này khiến tôi tin rằng có thể có một giới hạn kích thước tin nhắn tổng thể ở đâu đó trong ràng buộc pyzmq. API 0MQ C có [link] (http://api.zeromq.org/2-1:zmq-msg-init-size) hàm zmq_msg_init_size (3) mà tôi không thể tìm thấy trong tài liệu của pyzmq. Điều này có thể là nguyên nhân? – user183394

+0

Bạn có thể nhận được một traceback nơi nó được treo? Nó có thể cho bạn một gợi ý. –

+0

Tôi đã thử mã của bạn trên máy tính xách tay mac của tôi với string_length = 1024 * 1024 * 4 và nó hoạt động tốt, vì vậy tôi đoán nó phải có một cái gì đó để làm với một số loại tranh luận bộ nhớ. –

Trả lời

6

Vấn đề là lỗ thông khí (PUSH) của bạn đóng trước khi gửi xong. Bạn có một giấc ngủ của 1s ở phần cuối của chức năng thông gió, đó là không đủ để gửi tin nhắn 384MB. Đó là lý do tại sao bạn có ngưỡng bạn có, nếu giấc ngủ ngắn hơn thì ngưỡng sẽ thấp hơn.

Điều đó nói rằng, LINGER là được cho là để ngăn chặn loại điều này, vì vậy tôi sẽ mang lại điều này với zeromq: PUSH không xuất hiện để tôn trọng LINGER.

Sửa lỗi cho ví dụ cụ thể của bạn (không thêm giấc ngủ dài không xác định) sẽ sử dụng cùng một tín hiệu FINISH để chấm dứt máy thở của bạn làm công nhân của bạn. Bằng cách này, bạn đảm bảo rằng máy thở của bạn tồn tại miễn là cần thiết.

Revised thở:

def ventilator(): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to send work 
    ventilator_send = context.socket(zmq.PUSH) 
    ventilator_send.bind("tcp://127.0.0.1:5557") 

    # Set up a channel to receive control messages 
    control_receiver = context.socket(zmq.SUB) 
    control_receiver.connect("tcp://127.0.0.1:5559") 
    control_receiver.setsockopt(zmq.SUBSCRIBE, "") 

    # Give everything a second to spin up and connect 
    time.sleep(1) 

    # Create the input array 
    nelem = number_of_elements 
    slen = string_length 
    payloads = create_inputs(nelem, slen) 

    # Send an array to each worker 
    for num in range(np): 
     work_message = { 'num' : payloads } 
     ventilator_send.send_pyobj(work_message) 

    # Poll for FINISH message, so we don't shutdown too early 
    poller = zmq.Poller() 
    poller.register(control_receiver, zmq.POLLIN) 

    while True: 
     socks = dict(poller.poll()) 

     if socks.get(control_receiver) == zmq.POLLIN: 
      control_message = control_receiver.recv() 
      if control_message == "FINISHED": 
       print("Ventilator received FINSHED, quitting!") 
       break 
      # else: unhandled message 
+0

minrk, cảm ơn nhiều câu trả lời sâu sắc. Rất hữu ích! Tôi không nghi ngờ giá trị ZMQ_LINGER được thiết lập bởi zmq_setsockopt (3), vì như bạn đã nói, giá trị Mặc định là -1 (vô hạn). Cú bắt tuyệt vời! Tôi chắc chắn sẽ nêu vấn đề đầu tiên với những người bạn pyzmq và đề cập đến nó trên danh sách gửi thư zeromq. Tôi đã thử nghiệm sửa chữa của bạn tất cả các cách lên đến string_length thiết lập để 1024 * 1024 * 10, maxed ra RAM vật lý của máy tính xách tay của tôi và vẫn có kết quả mong đợi. Cảm ơn một lần nữa! – user183394

+3

Có lẽ không xứng đáng để đưa nó lên với 'pyzmq folks', vì đó là cơ bản của tôi ngay bây giờ. Tôi đã ping libzmq về nó và viết một trường hợp thử nghiệm đơn giản hơn trong C: https://gist.github.com/1643223 – minrk

Các vấn đề liên quan