2013-07-15 54 views
11

Tôi đã tạo một nhà xuất bản đơn giản và người tiêu dùng đăng ký trên hàng đợi bằng cách sử dụng basic.consume.Tiêu thụ không xác nhận tin nhắn từ RabbitMq

Người tiêu dùng của tôi thừa nhận thông báo khi công việc chạy mà không có ngoại lệ. Bất cứ khi nào tôi gặp một ngoại lệ, tôi không nhắn tin và quay lại sớm. Chỉ những thông báo được xác nhận biến mất khỏi hàng đợi, vì vậy các thông điệp đó hoạt động chính xác.
Bây giờ tôi muốn người tiêu dùng nhận lại các thông báo không thành công, nhưng cách duy nhất để xem xét lại các tin nhắn đó là bằng cách khởi động lại người tiêu dùng.

Tôi cần phải tiếp cận trường hợp sử dụng này như thế nào?

mã thiết lập

$channel = new AMQPChannel($connection); 

$exchange = new AMQPExchange($channel); 

$exchange->setName('my-exchange'); 
$exchange->setType('fanout'); 
$exchange->declare(); 

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declare(); 
$queue->bind('my-exchange'); 

đang Consumer

$queue->consume(array($this, 'callback')); 

public function callback(AMQPEnvelope $msg) 
{ 
    try { 
     //Do some business logic 
    } catch (Exception $ex) { 
     //Log exception 
     return; 
    } 
    return $queue->ack($msg->getDeliveryTag()); 
} 

đang sản xuất

$exchange->publish('message'); 
+0

Bạn sử dụng ngôn ngữ nào và bạn có thể cung cấp một số mã không? – pinepain

+0

@ zaq178miami, xem tin nhắn đã chỉnh sửa của tôi –

+0

@Bram_Gerritsen, xem cập nhật câu trả lời của tôi – pinepain

Trả lời

15

Nếu nhắn w vì không được công nhận và ứng dụng không thành công, nó sẽ được phân phối lại tự động và redelivered thuộc tính trên phong bì sẽ được đặt thành true (trừ khi bạn tiêu thụ chúng với cờ no-ack = true).

UPD:

Bạn phải nack nhắn với trả tàu cờ trong khối catch của bạn

try { 
     //Do some business logic 
    } catch (Exception $ex) { 
     //Log exception 
     return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); 
    } 

Hãy coi chừng các thông điệp vô cùng nacked trong khi đếm trả tàu không thực hiện trong RabbitMQ và trong giao thức AMQP ở tất cả.

Nếu bạn không muốn gây rối với những thông điệp đó và chỉ đơn giản là muốn thêm một số chậm trễ bạn có thể muốn thêm một số sleep() hoặc usleep() trước nack gọi phương thức, nhưng nó không phải là một ý tưởng tốt ở tất cả.

Có nhiều kỹ thuật để đối phó với vấn đề chu kỳ Phân phối lại:

1. Dựa vào Dead Letter Exchanges

  • ưu: đáng tin cậy, tiêu chuẩn, rõ ràng
  • khuyết điểm: yêu cầu logic thêm

2.Sử dụng per message or per queue TTL

  • ưu: dễ thực hiện, cũng chuẩn, rõ ràng
  • nhược điểm: với hàng đợi lâu bạn có thể mất một số điệp

Ví dụ (lưu ý, rằng cho hàng đợi ttl chúng tôi vượt qua chỉ số và cho ttl nhắn - bất cứ điều gì mà sẽ là chuỗi số):

2,1 mỗi ttl nhắn:

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declareQueue(); 
$queue->bind('my-exchange'); 

$exchange->publish(
    'message at ' . microtime(true), 
    null, 
    AMQP_NOPARAM, 
    array(
     'expiration' => '1000' 
    ) 
); 

2.2. Mỗi hàng đợi ttl:

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->setArgument('x-message-ttl', 1000); 
$queue->declareQueue(); 
$queue->bind('my-exchange'); 

$exchange->publish('message at ' . microtime(true)); 

3. Giữ redelivers đếm hoặc trái số redelivers (aka hop hạn chế hoặc ttl trong ngăn xếp IP) trong nội dung thư hoặc tiêu đề

  • ưu: cung cấp cho bạn kiểm soát thêm trên tin nhắn trọn đời trên cấp ứng dụng
  • điểm: chi phí đáng kể trong khi bạn phải sửa đổi thư và xuất bản lại, ứng dụng cụ thể, không rõ ràng

Code:

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declareQueue(); 
$queue->bind('my-exchange'); 

$exchange->publish(
    'message at ' . microtime(true), 
    null, 
    AMQP_NOPARAM, 
    array(
     'headers' => array(
      'ttl' => 100 
     ) 
    ) 
); 

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) { 
     $headers = $msg->getHeaders(); 
     echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' '; 
     echo $msg->getDeliveryTag(), ' '; 
     echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' '; 
     echo $msg->getBody(), PHP_EOL; 

     try { 
      //Do some business logic 
      throw new Exception('business logic failed'); 
     } catch (Exception $ex) { 
      //Log exception 
      if (isset($headers['ttl'])) { 
       // with ttl logic 

       if ($headers['ttl'] > 0) { 
        $headers['ttl']--; 

        $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers)); 
       } 

       return $queue->ack($msg->getDeliveryTag()); 
      } else { 
       // without ttl logic 
       return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue 
      } 

     } 

     return $queue->ack($msg->getDeliveryTag()); 
    } 
); 

Có thể có một số cách khác để kiểm soát tốt hơn thông điệp redelivers dòng chảy.

Kết luận: không có giải pháp viên đạn bạc. Bạn phải quyết định giải pháp nào phù hợp với nhu cầu của bạn tốt nhất hoặc tìm ra thứ gì đó khác, nhưng đừng quên chia sẻ nó ở đây;)

+0

Cảm ơn câu trả lời của bạn. 'redelivered' thực sự được đặt thành' true', nhưng tôi phải khởi động lại người dùng chặn để xem lại tin nhắn. –

+0

Cảm ơn, đây chính xác là những gì tôi cần. Bạn có thể cung cấp cho tôi một số chỉ đường/đề xuất về cách ngăn các thư được gửi lại vô hạn không? Nó sẽ là tốt đẹp nếu tôi có thể trì hoãn việc requeing đến hàng đợi bởi một số tiền nhất định của thứ hai, vì vậy tôi không quá tải máy chủ tiêu thụ của tôi. –

+0

ở đây bạn đi, cập nhật câu trả lời một lần nữa – pinepain

0

Nếu bạn không muốn khởi động lại người tiêu dùng, thì basic.recover lệnh AMQP có thể là những gì bạn muốn. Theo số AMQP protocol:

basic.recover(bit requeue) 

Redeliver unacknowledged messages. 

This method asks the server to redeliver all unacknowledged messages on a specified channel. 
Zero or more messages may be redelivered. This method replaces the asynchronous Recover. 
+0

Phương pháp này dường như không phải là một phần của API ứng dụng khách mà tôi đang sử dụng. http://www.php.net/manual/en/book.amqp.php –

+1

RabbitMQ có hỗ trợ một phần phương pháp này, xem [doc chính thức về nó] (https://www.rabbitmq.com/specification.html# phương thức-trạng thái-basic.recover) – pinepain

Các vấn đề liên quan