2015-01-05 17 views
6

Chúng tôi có một tập lệnh node.js chạy một máy chủ socket.io có các máy khách sử dụng các thông điệp từ hàng đợi RabbitMQ. Gần đây chúng tôi đã di chuyển sang Amazon AWS và RabbitMQ hiện là một cụm gồm hai máy (các phiên bản dự phòng). Kết nối AMQP bị mất theo thời gian (nó là một hạn chế đến từ môi trường sẵn sàng cao với các máy ảo dự phòng và chúng ta phải đối phó với nó) và nếu một nỗ lực kết nối lại được thực hiện, DNS sẽ chọn trường hợp nào để kết nối tới (nó là một cụm sao chép dữ liệu nên không quan trọng đối tượng nào cần kết nối).amqp.node sẽ không phát hiện thấy kết nối bị ngắt

Sự cố là nỗ lực kết nối lại không bao giờ được thực hiện; sau một thời gian, khi kết nối bị mất, amqp.node dường như không nhận thấy rằng kết nối đã bị mất. Ngoài ra, người tiêu dùng ngừng nhận tin nhắn và máy chủ socket.io chỉ dừng việc chấp nhận các kết nối mới.

Chúng tôi có thời gian chờ 55 giây heartbeat (không bị nhầm lẫn với time.io heartbeat timeout) được đặt tại URL RabbitMQ và đang kiểm tra các sự kiện 'lỗi' và 'đóng' với API gọi lại của amqp.node nhưng chúng dường như không bao giờ được ban hành. Hàng đợi mong đợi các tin nhắn được tiêu thụ sẽ được ack'ed. Chúng tôi muốn tập lệnh nút phát hiện một kết nối bị mất và tự hoàn thành, vì vậy môi trường sẽ tự động bắt đầu một quy trình mới và thiết lập lại kết nối.

Đây là mã, có thể chúng tôi đang làm điều gì đó sai với API gọi lại amqp.node hoặc một cái gì đó khác.

var express = require('express'); 
app = express(); 
var http = require('http'); 
var serverio = http.createServer(app); 
var io = require('socket.io').listen(serverio, { log: false }); 
var socket; 
var allcli = []; 
var red, blue, green, magenta, reset; 
red = '\033[31m'; 
blue = '\033[34m'; 
green = '\033[32m'; 
magenta = '\033[35m'; 
orange = '\033[43m'; 
reset = '\033[0m'; 

var queue = 'ha.atualizacao_mobile'; 
var urlRabbit = 'amqp://login:[email protected]?heartbeat=55' // Amazon 
var amqp = require('amqplib/callback_api'); 
var debug = true; 

console.log("Original Socket.IO heartbeat interval: " + io.get('heartbeat interval') + " seconds."); 
io.set('heartbeat interval', 10 * 60); 
console.log("Hearbeat interval changed to " + io.get('heartbeat interval') + " seconds to reduce battery consumption in the mobile clients."); 

console.log("Original Socket.IO heartbeat timeout: " + io.get('heartbeat timeout') + " seconds."); 
io.set('heartbeat timeout', 11 * 60); 
console.log("Heartbeat timeout set to " + io.get('heartbeat timeout') + " seconds."); 


io.sockets.on('connection', function(socket){ 

    socket.on('error', function (exc) { 
     console.log(orange+"Ignoring exception: " + exc + reset); 
    }); 

    socket.on('send-indice', function (data) { 
     // Some business logic 
    }); 

    socket.on('disconnect', function() { 
     // Some business logic 
    }); 

}); 

function updatecli(data){ 
    // Some business logic 
} 

amqp.connect(urlRabbit, null, function(err, conn) { 
    if (err !== null) { 
     return console.log("Error creating connection: " + err); 
    } 

    conn.on('error', function(err) { 
     console.log("Generated event 'error': " + err); 
    }); 

    conn.on('close', function() { 
     console.log("Connection closed."); 
     process.exit(); 
    }); 

    processRabbitConnection(conn, function() { 
     conn.close(); 
    }); 
}); 

function processRabbitConnection(conn, finalize) { 
    conn.createChannel(function(err, channel) { 

     if (err != null) { 
      console.log("Error creating channel: " + err); 
      return finalize(); 
     } 

     channel.assertQueue(queue, null, function(err, ok) { 
      if (err !== null) { 
        console.log("Error asserting queue " + queue + ": " + err); 
        return finalize(); 
      } 

      channel.consume(queue, function (msg) { 
       if (msg !== null) { 
        try { 
         var dataObj = JSON.parse(msg.content); 
         if (debug == true) { 
          //console.log(dataObj); 
         } 
         updatecli(dataObj); 
        } catch(err) { 
         console.log("Error in JSON: " + err); 
        } 
        channel.ack(msg); 
       } 
      }, null, function(err, ok) { 
       if (err !== null) { 
        console.log("Error consuming message: " + err); 
        return finalize(); 
       } 
      }); 
     }); 
    }); 
} 

serverio.listen(9128, function() { 
    console.log('Server: Socket IO Online - Port: 9128 - ' + new Date()); 
}); 

Trả lời

7

Rõ ràng vấn đề đã được giải quyết. Nhịp tim gần 60 giây là vấn đề. Nó xung đột với cân bằng tải RabbitMQ, kiểm tra cứ sau 1 phút cho dù dữ liệu có đi qua kết nối hay không (nếu không có dữ liệu nào được truyền, nó sẽ ngắt kết nối). Kết nối AMQP ngừng nhận tin nhắn và thư viện dường như không phản ứng với điều đó. Nhịp tim thấp hơn (ví dụ: 30 giây) là cần thiết để tránh tình trạng này.

Các vấn đề liên quan