2015-07-12 15 views
11

Tôi tự hỏi tại sao RPC-Client RabbitMQ của tôi luôn xử lý các thông báo đã chết sau khi khởi động lại. _channel.QueueDeclare(queue, false, false, false, null); nên tắt bộ đệm. Nếu tôi quá tải QueueDeclare bên trong RPC-Client, tôi không thể kết nối với máy chủ. Có gì sai ở đây không? Bất kỳ ý tưởng làm thế nào để khắc phục vấn đề này?Hàng đợi có độ dài RabbitMQ không hoạt động (RPC-Server, RPC-Client)


RPC-Server

new Thread(() => 
{ 
    var factory = new ConnectionFactory { HostName = _hostname }; 
    if (_port > 0) 
     factory.Port = _port; 
    _connection = factory.CreateConnection(); 
    _channel = _connection.CreateModel(); 

    _channel.QueueDeclare(queue, false, false, false, null); 
    _channel.BasicQos(0, 1, false); 
    var consumer = new QueueingBasicConsumer(_channel); 
    _channel.BasicConsume(queue, false, consumer); 
    IsRunning = true; 
    while (IsRunning) 
    { 
     BasicDeliverEventArgs ea; 
     try { 
      ea = consumer.Queue.Dequeue(); 
     } 
     catch (Exception ex) { 
      IsRunning = false; 
     } 
     var body = ea.Body; 
     var props = ea.BasicProperties; 
     var replyProps = _channel.CreateBasicProperties(); 
     replyProps.CorrelationId = props.CorrelationId; 

     var xmlRequest = Encoding.UTF8.GetString(body); 

     var messageRequest = XmlSerializer.DeserializeObject(xmlRequest, typeof(Message)) as Message; 
     var messageResponse = handler(messageRequest); 

     _channel.BasicPublish("", props.ReplyTo, replyProps, 
           messageResponse); 
     _channel.BasicAck(ea.DeliveryTag, false); 
    } 
}).Start(); 

RPC-Client

public void Start() 
{ 
    if (IsRunning) 
     return; 
    var factory = new ConnectionFactory { 
     HostName = _hostname, 
     Endpoint = _port <= 0 ? new AmqpTcpEndpoint(_endpoint) 
           : new AmqpTcpEndpoint(_endpoint, _port) 
    }; 
    _connection = factory.CreateConnection(); 
    _channel = _connection.CreateModel(); 
    _replyQueueName = _channel.QueueDeclare(); // Do not connect any more 
    _consumer = new QueueingBasicConsumer(_channel); 
    _channel.BasicConsume(_replyQueueName, true, _consumer); 
    IsRunning = true; 
} 

public Message Call(Message message) 
{ 
    if (!IsRunning) 
     throw new Exception("Connection is not open."); 
    var corrId = Guid.NewGuid().ToString().Replace("-", ""); 
    var props = _channel.CreateBasicProperties(); 
    props.ReplyTo = _replyQueueName; 
    props.CorrelationId = corrId; 

    if (!String.IsNullOrEmpty(_application)) 
     props.AppId = _application; 

    message.InitializeProperties(_hostname, _nodeId, _uniqueId, props); 

    var messageBytes = Encoding.UTF8.GetBytes(XmlSerializer.ConvertToString(message)); 
    _channel.BasicPublish("", _queue, props, messageBytes); 

    try 
    { 
     while (IsRunning) 
     { 
      var ea = _consumer.Queue.Dequeue(); 
      if (ea.BasicProperties.CorrelationId == corrId) 
      { 
       var xmlResponse = Encoding.UTF8.GetString(ea.Body); 
       try 
       { 
        return XmlSerializer.DeserializeObject(xmlResponse, typeof(Message)) as Message; 
       } 
       catch(Exception ex) 
       { 
        IsRunning = false; 
        return null; 
       } 
      } 
     } 
    } 
    catch (EndOfStreamException ex) 
    { 
     IsRunning = false; 
     return null; 
    } 
    return null; 
} 

Trả lời

6

Hãy thử thiết lập thuộc tính DeliveryMode để không liên tục (1) trong RPC- của bạn Mã khách hàng như sau:

public Message Call(Message message) 
{ 
    ... 
    var props = _channel.CreateBasicProperties(); 
    props.DeliveryMode = 1; //you might want to do this in your RPC-Server as well 
    ... 
} 

AMQP Model Explained chứa các tài nguyên rất hữu ích, như giải thích cách xử lý thư kết thúc trong hàng đợi thư chết.

Một lưu ý hữu ích từ các tài liệu liên quan đến xếp hàng độ bền:

hàng đợi lâu bền được tiếp tục tồn vào đĩa và do đó tồn khởi động lại nhà môi giới. Hàng đợi không bền được gọi là thoáng qua. Không phải tất cả các trường hợp và trường hợp sử dụng ủy quyền hàng đợi là bền.

Độ bền của hàng đợi không làm cho thư được định tuyến đến hàng đợi đó có độ bền là . Nếu nhà môi giới bị gỡ xuống và sau đó được sao lưu, hàng đợi lâu sẽ được khai báo lại trong khi khởi động nhà môi giới, tuy nhiên, chỉ tin nhắn liên tục mới được khôi phục.

Lưu ý rằng nó nói về hãy khởi động lại nhà môi giới không phải nhà xuất bản hoặc người tiêu dùng khởi động lại.

+0

Dịch vụ này có trợ giúp @ MR.ABC không? –

Các vấn đề liên quan