2015-06-16 16 views
6

Tôi đã thực hiện ví dụ Last Caching Caching (LVC) của ZMQ (http://zguide.zeromq.org/php:chapter5#Last-Value-Caching), nhưng không thể thuê bao thứ 2 đăng ký ở backend.ZMQ: Không có tin nhắn đăng ký trên ổ cắm XPUB cho nhiều thuê bao (mẫu Caching giá trị cuối cùng)

Lần đầu tiên người đăng ký tham gia, điều kiện event[0] == b'\x01' được đáp ứng và giá trị được lưu trong bộ nhớ cache được gửi, nhưng người đăng ký thứ hai (cùng một chủ đề) thậm chí không đăng ký (if backend in events: là không bao giờ đúng). Mọi thứ khác hoạt động tốt. Dữ liệu được chuyển từ nhà xuất bản tới người đăng ký (tất cả).

Điều gì có thể là lý do cho điều này? Cách phụ trợ được kết nối đúng chưa? Mẫu này chỉ có tác dụng với người đăng ký đầu tiên?

Cập nhật

Khi tôi đăng ký thuê bao thứ 2 đến chủ đề khác, tôi nhận được hành vi đúng (ví dụ: \x01 khi đăng ký). Điều này thực sự có vẻ làm việc cho người đăng ký đầu tiên. Là một lỗi trong ZeroMQ?

Cập nhật 2

Dưới đây là một ví dụ làm việc tối thiểu cho thấy rằng mô hình LVC là không làm việc (ít nhất không phải là cách nó được thực hiện ở đây).

# subscriber.py 
import zmq 

def main(): 
    ctx = zmq.Context.instance() 
    sub = ctx.socket(zmq.SUB) 
    sub.connect("tcp://127.0.0.1:5558") 

    # Subscribe to every single topic from publisher 
    print 'subscribing (sub side)' 
    sub.setsockopt(zmq.SUBSCRIBE, b"my-topic") 

    poller = zmq.Poller() 
    poller.register(sub, zmq.POLLIN) 
    while True: 
     try: 
      events = dict(poller.poll(1000)) 
     except KeyboardInterrupt: 
      print("interrupted") 
      break 

     # Any new topic data we cache and then forward 
     if sub in events: 
      msg = sub.recv_multipart() 
      topic, current = msg 
      print 'received %s on topic %s' % (current, topic) 

if __name__ == '__main__': 
    main() 

Và đây là nhà môi giới (như trong ví dụ, nhưng có chút chi tiết và nhà xuất bản tích hợp).

# broker.py 
# from http://zguide.zeromq.org/py:lvcache 
import zmq 
import threading 
import time 


class Publisher(threading.Thread): 
    def __init__(self): 
     super(Publisher, self).__init__() 

    def run(self): 
     time.sleep(10) 
     ctx = zmq.Context.instance() 
     pub = ctx.socket(zmq.PUB) 
     pub.connect("tcp://127.0.0.1:5557") 

     cnt = 0 
     while True: 
      msg = 'hello %d' % cnt 
      print 'publisher is publishing %s' % msg 
      pub.send_multipart(['my-topic', msg]) 
      cnt += 1 
      time.sleep(5) 


def main(): 
    ctx = zmq.Context.instance() 
    frontend = ctx.socket(zmq.SUB) 
    frontend.bind("tcp://*:5557") 
    backend = ctx.socket(zmq.XPUB) 
    backend.bind("tcp://*:5558") 

    # Subscribe to every single topic from publisher 
    frontend.setsockopt(zmq.SUBSCRIBE, b"") 

    # Store last instance of each topic in a cache 
    cache = {} 

    # We route topic updates from frontend to backend, and 
    # we handle subscriptions by sending whatever we cached, 
    # if anything: 
    poller = zmq.Poller() 
    poller.register(frontend, zmq.POLLIN) 
    poller.register(backend, zmq.POLLIN) 


    # launch a publisher 
    p = Publisher() 
    p.daemon = True 
    p.start() 

    while True: 

     try: 
      events = dict(poller.poll(1000)) 
     except KeyboardInterrupt: 
      print("interrupted") 
      break 

     # Any new topic data we cache and then forward 
     if frontend in events: 
      msg = frontend.recv_multipart() 
      topic, current = msg 
      cache[topic] = current 
      backend.send_multipart(msg) 

     ### this is where it fails for the 2nd subscriber. 
     ### There's never even an event from the backend 
     ### in events when the 2nd subscriber is subscribing. 

     # When we get a new subscription we pull data from the cache: 
     if backend in events: 
      print 'message from subscriber' 
      event = backend.recv() 
      # Event is one byte 0=unsub or 1=sub, followed by topic 
      if event[0] == b'\x01': 
       topic = event[1:] 
       print ' => subscribe to %s' % topic 
       if topic in cache: 
        print ("Sending cached topic %s" % topic) 
        backend.send_multipart([ topic, cache[topic] ]) 
      elif event[0] == b'\x00': 
       topic = event[1:] 
       print ' => unsubscribe from %s' % topic 

if __name__ == '__main__': 
    main() 

Chạy mã này (1 x broker.py, 2 x subscriber.py) cho thấy các thuê bao đầu tiên đăng ký tại công ty môi giới như mong đợi (\x01 và tra cứu bộ nhớ cache), nhưng các thuê bao thứ 2 không được đăng ký theo cùng một cách. Điều thú vị là, thuê bao thứ 2 được nối với kênh pub/sub, như sau một thời gian (10 giây) cả hai thuê bao nhận dữ liệu từ nhà xuất bản.

Điều này rất lạ. Có lẽ một số thư viện của tôi đã lỗi thời. Đây là những gì tôi nhận:

Python 2.7.9 (v2.7.9:648dcafa7e5f, Dec 10 2014, 10:10:46) 
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin 
Type "help", "copyright", "credits" or "license" for more information. 
>>> import zmq 
>>> zmq.__version__ 
'14.1.1' 

$ brew info zeromq 
zeromq: stable 4.0.5 (bottled), HEAD 
High-performance, asynchronous messaging library 
http://www.zeromq.org/ 
/usr/local/Cellar/zeromq/4.0.5_2 (64 files, 2.8M) * 
    Poured from bottle 
From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/zeromq.rb 
==> Dependencies 
Build: pkg-config ✔ 
Optional: libpgm ✘, libsodium ✘ 

Cập nhật 3

Hành vi này cũng có thể được quan sát thấy trong zeromq 4.1.2pyzmq-14.7.0 (có hoặc không có libpgm và libsodium cài đặt).

Cập nhật 4

quan sát khác cho thấy rằng các thuê bao đầu tiên là bằng cách nào đó xử lý khác nhau: Các thuê bao đầu tiên là người duy nhất hủy đăng ký theo cách dự kiến ​​từ XPUB ổ cắm (backend) bằng cách đặt trước chủ đề thuê bao của mình với \x00 . Các thuê bao khác (tôi đã thử nhiều hơn 2) ở lại câm trên kênh phụ trợ (mặc dù nhận được tin nhắn).

Cập nhật 5

Tôi hy vọng tôi sẽ không xuống một hang thỏ, nhưng tôi đã nhìn vào czmq bindings và chạy ví dụ Python của tôi trong C.Kết quả là như nhau, vì vậy tôi đoán nó không phải là một vấn đề với các ràng buộc, nhưng với libzmq.

Tôi cũng xác nhận rằng các thuê bao thứ 2 được gửi tin nhắn đăng ký và thực sự tôi có thể thấy điều này trên dây:

Đầu tiên đăng ký:

0000 02 00 00 00 45 00 00 3f 98 be 40 00 40 06 00 00 ....E..? [email protected]@... 
0010 7f 00 00 01 7f 00 00 01 fa e5 15 b6 34 f0 51 c3 ........ ....4.Q. 
0020 05 e4 8b 77 80 18 31 d4 fe 33 00 00 01 01 08 0a ...w..1. .3...... 
0030 2a aa d1 d2 2a aa cd e9 00 09 01 6d 79 2d 74 6f *...*... ...my-to 
0040 70 69 63           pic    

2nd đăng ký thông điệp với chênh lệch (lên trên) đánh dấu và giải thích. Cùng một dữ liệu được gửi trong khung đăng ký.

       identification 
           v 
0000 02 00 00 00 45 00 00 3f ed be 40 00 40 06 00 00 ....E..? [email protected]@... 
          src port  sequence number 
            v  v v v v 
0010 7f 00 00 01 7f 00 00 01 fa e6 15 b6 17 da 02 e7 ........ ........ 

Acknowledgement number window scaling factor 
     v v v v   v 
0020 71 4b 33 e6 80 18 31 d5 fe 33 00 00 01 01 08 0a qK3...1. .3...... 

timestamp value timestamp echo reply 
      v   v v |<-------- data ------- 
0030 2a aa f8 2c 2a aa f4 45 00 09 01 6d 79 2d 74 6f *..,*..E ...my-to 

     ------>| 
0040 70 69 63           pic    

Trả lời

7

Tôi đã tìm ra giải pháp cho vấn đề này và mặc dù tôi đã đọc tài liệu từ trước ra sau và quay lại phía trước, tôi chưa thấy nó. Khóa là XPUB_VERBOSE. Thêm dòng này vào sau khi khởi động phụ trợ và tất cả mọi thứ hoạt động tốt

backend.setsockopt(zmq.XPUB_VERBOSE, True) 

Dưới đây là một trích from the official documentation:

ZMQ_XPUB_VERBOSE: cung cấp tất cả các thư thuê bao trên XPUB socket Thiết lập hành vi XPUB ổ cắm trên đăng ký mới và hủy đăng ký. Giá trị 0 là giá trị mặc định và chỉ chuyển các thông báo đăng ký mới đến . Giá trị 1 vượt qua tất cả các thông báo đăng ký ngược dòng.

giá trị Lựa chọn đơn vị kiểu int Lựa chọn giá trị 0, 1 Giá trị mặc định 0 loại ổ cắm Áp dụng ZMQ_XPUB

Pieter Hintjens có một số chi tiết thông tin về in his blog này. Đây là phần có liên quan:

Một vài tháng trước chúng tôi đã thêm một lựa chọn nhỏ gọn (ZMQ_XPUB_VERBOSE) để XPUB ổ cắm mà vô hiệu hóa lọc của đăng ký trùng lặp. Tính năng này hiện hoạt động với bất kỳ số người đăng ký nào. Chúng tôi sử dụng điều này như sau:

void *publisher = zsocket_new (ctx, ZMQ_XPUB); 
zsocket_set_xpub_verbose (publisher, 1); 
zsocket_bind (publisher, "tcp://*:6001"); 

Các LVC mô tả mô hình cần được cập nhật để phản ánh bối cảnh này, vì mô hình này sẽ không làm việc khác.

Các vấn đề liên quan