2016-11-10 13 views
6

Chúng tôi có một nhà môi giới ActiveMQ được kết nối từ các khách hàng rất khác nhau bằng cách sử dụng JMS, AMQP và MQTT. Vì một lý do nào đó, chúng tôi chưa tìm ra một tập hợp cụ thể các khách hàng MQTT thường xuyên (không phải luôn luôn) đăng ký lâu dài. Đây là một môi trường thử nghiệm mà khách hàng được thêm vào và loại bỏ khá thường xuyên, sau này đôi khi bằng cách kéo các plug hoặc khởi động lại một thiết bị nhúng, để họ không thể hủy đăng ký đúng cách. Hiệu ứng (IIUC) là nhà môi giới đã đăng ký "đăng ký có độ bền cao" cho các thiết bị mà nó không bao giờ có thể nhìn thấy nữa (tôi có thể thấy những điều này dưới http://my_broker:8161/admin/subscribers.jsp), lưu giữ tin nhắn trên các chủ đề đó mãi mãi cho đến khi nó bị hỏng. .Làm cách nào để nhà môi giới ActiveMQ của tôi thả người đăng ký có độ bền cao ngoại tuyến

Vấn đề ở đây là người đăng ký có thể đăng ký lâu dài và chúng tôi cần tìm hiểu lý do. Tuy nhiên, nó cũng đã được quyết định rằng khách hàng làm điều này (vô tình) không nên mang lại cho các nhà môi giới để dừng nghiền, vì vậy chúng tôi cần phải giải quyết vấn đề này một cách độc lập.

Tôi đã tìm thấy there are settings for a timeout for offline durable subscriptions và đưa những thành cấu hình môi giới của chúng tôi (hai dòng cuối):

<broker 
    xmlns="http://activemq.apache.org/schema/core" 
    brokerName="my_broker" 
    dataDirectory="${activemq.data}" 
    useJmx="true" 
    advisorySupport="false" 
    persistent="false" 
    offlineDurableSubscriberTimeout="1800000" 
    offlineDurableSubscriberTaskSchedule="60000"> 

Nếu tôi hiểu đúng, bên trên nên kiểm tra mỗi phút và bỏ khách hàng nó đã không nhìn thấy cho nửa giờ. Tuy nhiên, trái với các tài liệu, điều này dường như không hoạt động: Một người tiêu dùng tôi đã đăng ký và sau đó rút phích cắm vào ngày trước vẫn còn nhìn thấy trong danh sách các thuê bao bền offline, dấu chân bộ nhớ của nhà môi giới liên tục tăng, và nếu tôi xóa người đăng ký theo cách thủ công trong giao diện web của nhà môi giới tôi có thể thấy dấu chân bộ nhớ bị hỏng.

Vì vậy, đây là câu hỏi của tôi:

  1. Điều gì quyết định liệu một thuê bao MQTT đến một chủ đề trên một môi giới ActiveMQ là bền?
  2. Tôi đang làm gì sai khi thiết lập thời gian chờ để bỏ đăng ký vĩnh viễn trong cài đặt ActiveMQ?
+0

Bạn đã thử cách khác xung quanh, bằng việc xuất bản các thông điệp với thời gian TTL ngắn (time to live) và bằng cách cấu hình một đoạn ngắn ** expireMessagesPeriod **? Theo tài liệu, với cấu hình này, hệ thống phải thanh lọc tất cả các thông báo như vậy sau khi hết thời gian TTL, không kể đến các thuê bao lâu dài bị mất (người không hủy đăng ký).Điều đó cũng sẽ giúp chúng tôi giải phóng tài nguyên bộ nhớ, vì bộ nhớ thực tế được tiêu thụ là để lưu trữ "tin nhắn" và không lưu trữ đối tượng người đăng ký. – blackpen

Trả lời

2

Tôi đã trích xuất mã có liên quan (doCleanup()) để xóa các đăng ký có độ dài đã hết hạn.

Trong trường hợp thành công, nó thực thi:

LOG.info("Destroying durable subscriber due to inactivity: {}", sub); 

Trong trường hợp thất bại, nó thực thi:

LOG.error("Failed to remove inactive durable subscriber", e); 

Hãy tìm dòng nhật ký trên trong tập tin đăng nhập của bạn và kết hợp nó với các chi tiết mà bạn quan sát sử dụng admin/users.jsp viewer. Nếu nó không in bất kỳ dòng nào, các đăng ký có thể còn lại active vì một số lý do hoặc bạn có thể đã vấp phải một lỗi.

Ngoài ra, bạn có thể thử xóa dấu gạch dưới (_) trong tên nhà môi giới nếu bạn có thể không? Hướng dẫn sử dụng nói về các vấn đề với dấu gạch dưới trong tên nhà môi giới.

Code:

public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 
    super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 
    if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) { 
     this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true); 
     this.cleanupTask = new TimerTask() { 
     @Override 
     public void run() { 
      doCleanup(); 
     } 
     }; 
     this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(),broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule()); 
    } 
} 

public void doCleanup() { 
    long now = System.currentTimeMillis(); 
    for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) { 
     DurableTopicSubscription sub = entry.getValue(); 
     if (!sub.isActive()) { 
     long offline = sub.getOfflineTimestamp(); 
     if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) { 
      LOG.info("Destroying durable subscriber due to inactivity: {}", sub); 
      try { 
       RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 
       info.setClientId(entry.getKey().getClientId()); 
       info.setSubscriptionName(entry.getKey().getSubscriptionName()); 
       ConnectionContext context = new ConnectionContext(); 
       context.setBroker(broker); 
       context.setClientId(entry.getKey().getClientId()); 
       removeSubscription(context, info); 
      } catch (Exception e) { 
       LOG.error("Failed to remove inactive durable subscriber", e); 
      } 
     } 
     } 
    } 
} 

// The toString method for DurableTopicSubscription class 
@Override 
public synchronized String toString() { 
    return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension(); 
} 
+0

Cảm ơn bạn đã trả lời câu hỏi này. Tùy chọn hỗ trợ chuyên nghiệp của chúng tôi được bật lên với hai sự kiện: _1) _ Có thực sự là một lỗi trong ActiveMQ nơi không thể xóa người đăng ký có độ bền cao ngoại tuyến. _2) _ Để đăng ký không-durably từ MQTT, bạn cần kết nối với 'cleanSession' được đặt thành' true' và với QoS <1. – sbi

Các vấn đề liên quan