2017-03-24 19 views
14

gần đây, tôi đã thực hiện nhanh chóng hệ thống xếp hàng của nhà sản xuất/người tiêu dùng.Triển khai hàng đợi bị trì hoãn đối với PHP AMQP

<?php 
namespace Queue; 

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable;  

class Amqp 
{ 
    private $connection; 
    private $queueName; 
    private $delayedQueueName; 
    private $channel; 
    private $callback; 

    public function __construct($host, $port, $login, $password, $queueName) 
    { 
     $this->connection = new AMQPStreamConnection($host, $port, $login, $password); 
     $this->queueName = $queueName; 
     $this->delayedQueueName = null; 
     $this->channel = $this->connection->channel(); 
     // First, we need to make sure that RabbitMQ will never lose our queue. 
     // In order to do so, we need to declare it as durable. To do so we pass 
     // the third parameter to queue_declare as true. 
     $this->channel->queue_declare($queueName, false, true, false, false); 
    } 

    public function __destruct() 
    { 
     $this->close(); 
    } 

    // Just in case : http://stackoverflow.com/questions/151660/can-i-trust-php-destruct-method-to-be-called 
    // We should call close explicitly if possible. 
    public function close() 
    { 
     if (!is_null($this->channel)) { 
      $this->channel->close(); 
      $this->channel = null; 
     } 

     if (!is_null($this->connection)) { 
      $this->connection->close(); 
      $this->connection = null; 
     } 
    } 

    public function produceWithDelay($data, $delay) 
    { 
     if (is_null($this->delayedQueueName)) 
     { 
      $delayedQueueName = $this->queueName . '.delayed'; 

      // First, we need to make sure that RabbitMQ will never lose our queue. 
      // In order to do so, we need to declare it as durable. To do so we pass 
      // the third parameter to queue_declare as true. 
      $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false, 
       new AMQPTable(array(
        'x-dead-letter-exchange' => '', 
        'x-dead-letter-routing-key' => $this->queueName 
       )) 
      ); 

      $this->delayedQueueName = $delayedQueueName; 
     } 

     $msg = new AMQPMessage(
      $data, 
      array(
       'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 
       'expiration' => $delay 
      ) 
     ); 

     $this->channel->basic_publish($msg, '', $this->delayedQueueName); 
    } 

    public function produce($data) 
    { 
     $msg = new AMQPMessage(
      $data, 
      array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) 
     ); 

     $this->channel->basic_publish($msg, '', $this->queueName); 
    } 

    public function consume($callback) 
    { 
     $this->callback = $callback; 

     // This tells RabbitMQ not to give more than one message to a worker at 
     // a time. 
     $this->channel->basic_qos(null, 1, null); 

     // Requires ack. 
     $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback')); 

     while(count($this->channel->callbacks)) { 
      $this->channel->wait(); 
     } 
    } 

    public function consumeCallback($msg) 
    { 
     call_user_func_array(
      $this->callback, 
      array($msg) 
     ); 

     // Very important to ack, in order to remove msg from queue. Ack after 
     // callback, as exception might happen in callback. 
     $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 
    } 

    public function getQueueSize() 
    { 
     // three tuple containing (<queue name>, <message count>, <consumer count>) 
     $tuple = $this->channel->queue_declare($this->queueName, false, true, false, false); 
     if ($tuple != null && isset($tuple[1])) { 
      return $tuple[1]; 
     } 
     return -1; 
    } 
} 

public function producepublic function consume cặp hoạt động như mong đợi.

Tuy nhiên, khi nó đi kèm với hệ thống hàng đợi chậm

public function produceWithDelaypublic function consume cặp không hoạt động như mong đợi. Người tiêu dùng gọi consume, không thể nhận bất kỳ mặt hàng nào, thậm chí chờ một thời gian.

Tôi tin điều gì đó không đúng với triển khai produceWithDelay của mình. Tôi có thể biết có chuyện gì không?

+0

Cố gắng tuyên bố hàng đợi của bạn như '$ channel-> queue_declare ("tên", false, false, false, true, đúng, array());' và trao đổi có lẽ tiếp theo sau này [ý chính ] (https://gist.github.com/tairov/11289983) – Vardius

+0

không cần phải triển khai nó từ đầu. Đó là cách bạn nên làm điều đó https://stackoverflow.com/a/45549182/579025 –

Trả lời

1

Để ghi chú bên.

Tôi phát hiện ra điều này là do lỗi của chính tôi.

Thay vì

if (is_null($this->delayedQueueName)) 
    { 
     $delayedQueueName = $this->queueName . '.delayed'; 

     $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false, 
     ... 

     $this->delayedQueueName = $delayedQueueName; 
    } 

tôi nên viết nó trong

if (is_null($this->delayedQueueName)) 
    { 
     $delayedQueueName = $this->queueName . '.delayed'; 

     $this->channel->queue_declare(delayedQueueName, false, true, false, false, false, 
     ... 

     $this->delayedQueueName = $delayedQueueName; 
    } 

biến thành viên của tôi vẫn chưa được khởi tạo đúng cách.

Mã hoàn toàn khả thi như sau, cho mục đích tham khảo của bạn.

<?php 

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable; 

class Amqp 
{ 
    private $connection; 
    private $queueName; 
    private $delayedQueueName; 
    private $channel; 
    private $callback; 

    public function __construct($host, $port, $login, $password, $queueName) 
    { 
     $this->connection = new AMQPStreamConnection($host, $port, $login, $password); 
     $this->queueName = $queueName; 
     $this->delayedQueueName = null; 
     $this->channel = $this->connection->channel(); 
     $this->channel->queue_declare($queueName, false, true, false, false); 
    } 

    public function __destruct() 
    { 
     $this->close(); 
    } 

    public function close() 
    { 
     if (!is_null($this->channel)) { 
      $this->channel->close(); 
      $this->channel = null; 
     } 

     if (!is_null($this->connection)) { 
      $this->connection->close(); 
      $this->connection = null; 
     } 
    } 

    public function produceWithDelay($data, $delay) 
    { 
     if (is_null($this->delayedQueueName)) 
     { 
      $delayedQueueName = $this->queueName . '.delayed'; 

      $this->channel->queue_declare($delayedQueueName, false, true, false, false, false, 
       new AMQPTable(array(
        'x-dead-letter-exchange' => '', 
        'x-dead-letter-routing-key' => $this->queueName 
       )) 
      ); 

      $this->delayedQueueName = $delayedQueueName; 
     } 

     $msg = new AMQPMessage(
      $data, 
      array(
       'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 
       'expiration' => $delay 
      ) 
     ); 

     $this->channel->basic_publish($msg, '', $this->delayedQueueName); 
    } 

    public function produce($data) 
    { 
     $msg = new AMQPMessage(
      $data, 
      array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) 
     ); 

     $this->channel->basic_publish($msg, '', $this->queueName); 
    } 

    public function consume($callback) 
    { 
     $this->callback = $callback; 

     $this->channel->basic_qos(null, 1, null); 

     $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'callback')); 

     while (count($this->channel->callbacks)) { 
      $this->channel->wait(); 
     } 
    } 

    public function callback($msg) 
    { 
     call_user_func_array(
      $this->callback, 
      array($msg) 
     ); 

     $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 
    } 
} 
3

Nắm tay tất cả xác minh rằng plugin của bạn rabbitmq_delayed_message_exchange được bật bằng cách chạy lệnh: rabbitmq-plugins list, Nếu không - hãy đọc thêm thông tin here.

Và bạn phải cập nhật phương thức __construct vì bạn cần khai báo hàng đợi theo một cách khác. Tôi không giả vờ để cập nhật cấu trúc của bạn, nhưng muốn cung cấp ví dụ đơn giản của tôi:

Declare đợi:

<?php 

require_once __DIR__ . '/../vendor/autoload.php'; 

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable; 

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 
$channel = $connection->channel(); 
$args = new AMQPTable(['x-delayed-type' => 'fanout']); 
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args); 
$args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']); 
$channel->queue_declare('delayed_queue', false, true, false, false, false, $args); 
$channel->queue_bind('delayed_queue', 'delayed_exchange'); 

Send message:

$data = 'Hello World at ' . date('Y-m-d H:i:s'); 
$delay = 7000; 
$message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); 
$headers = new AMQPTable(['x-delay' => $delay]); 
$message->set('application_headers', $headers); 
$channel->basic_publish($message, 'delayed_exchange'); 
printf(' [x] Message sent: %s %s', $data, PHP_EOL); 
$channel->close(); 
$connection->close(); 

Nhận thông điệp:

$callback = function (AMQPMessage $message) { 
    printf(' [x] Message received: %s %s', $message->body, PHP_EOL); 
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); 
}; 
$channel->basic_consume('delayed_queue', '', false, false, false, false, $callback); 
while(count($channel->callbacks)) { 
    $channel->wait(); 
} 
$channel->close(); 
$connection->close(); 

Ngoài ra, bạn có thể tìm thấy các tệp nguồn here.
Hy vọng nó sẽ giúp bạn!

Các vấn đề liên quan