2011-07-14 42 views
6

Tôi mới sử dụng RabbitMQ. Tôi muốn có thể xử lý các tin nhắn đọc mà không bị chặn khi có nhiều hàng đợi (để đọc từ). Bất kỳ đầu vào về cách tôi có thể làm điều đó?Đọc từ nhiều hàng đợi, RabbitMQ

// Sửa 1

public class Rabbit : IMessageBus 
{ 

    private List<string> publishQ = new List<string>(); 
    private List<string> subscribeQ = new List<string>(); 

    ConnectionFactory factory = null; 
    IConnection connection = null; 
    IModel channel = null; 
    Subscription sub = null; 

    public void writeMessage(Measurement m1) { 
     byte[] body = Measurement.AltSerialize(m1); 
     int msgCount = 1; 
     Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id); 

     string finalQueue = publishToQueue(m1.id); 

     while (msgCount --> 0) { 
      channel.BasicPublish("amq.direct", finalQueue, null, body); 
     } 

     Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id); 
    } 

    public string publishToQueue(string firstQueueName) { 
     Console.WriteLine("Creating a queue and binding it to amq.direct"); 
     string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null); 
     channel.QueueBind(queueName, "amq.direct", queueName, null); 
     Console.WriteLine("Done. Created queue {0} and bound it to amq.direct.\n", queueName); 
     return queueName; 
    } 


    public Measurement readMessage() { 
     Console.WriteLine("Receiving message..."); 
     Measurement m = new Measurement(); 

     int i = 0; 
     foreach (BasicDeliverEventArgs ev in sub) { 
      m = Measurement.AltDeSerialize(ev.Body); 
      //m.id = //get the id here, from sub 
      if (++i == 1) 
       break; 
      sub.Ack(); 
     } 

     Console.WriteLine("Done.\n"); 
     return m; 
    } 


    public void subscribeToQueue(string queueName) 
    { 
     sub = new Subscription(channel, queueName); 
    } 

    public static string MsgSysName; 
    public string MsgSys 
    { 
     get 
     { 
      return MsgSysName; 
     } 
     set 
     { 
      MsgSysName = value; 
     } 
    } 

    public Rabbit(string _msgSys) //Constructor 
    { 
     factory = new ConnectionFactory(); 
     factory.HostName = "localhost"; 
     connection = factory.CreateConnection(); 
     channel = connection.CreateModel(); 
     //consumer = new QueueingBasicConsumer(channel); 

     System.Console.WriteLine("\nMsgSys: RabbitMQ"); 
     MsgSys = _msgSys; 
    } 

    ~Rabbit() 
    { 
     //observer?? 
     connection.Dispose(); 
     //channel.Dispose(); 
     System.Console.WriteLine("\nDestroying RABBIT"); 
    } 
} 

// Sửa 2

private List<Subscription> subscriptions = new List<Subscription>(); 
    Subscription sub = null; 

public Measurement readMessage() 
    { 
     Measurement m = new Measurement(); 
     foreach(Subscription element in subscriptions) 
     { 
      foreach (BasicDeliverEventArgs ev in element) { 
       //ev = element.Next(); 
       if(ev != null) { 
        m = Measurement.AltDeSerialize(ev.Body); 
        return m; 
       } 
       m = null; 
      }   
     } 
     System.Console.WriteLine("No message in the queue(s) at this time."); 
     return m; 
    } 

    public void subscribeToQueue(string queueName) 
    { 
     sub = new Subscription(channel, queueName); 
     subscriptions.Add(sub);  
    } 

// Sửa 3

//MessageHandler.cs 

public class MessageHandler 
{ 
    // Implementation of methods for Rabbit class go here 
    private List<string> publishQ = new List<string>(); 
    private List<string> subscribeQ = new List<string>(); 

    ConnectionFactory factory = null; 
    IConnection connection = null; 
    IModel channel = null; 
    QueueingBasicConsumer consumer = null; 

    private List<Subscription> subscriptions = new List<Subscription>(); 
    Subscription sub = null; 

    public void writeMessage (Measurement m1) 
    { 
     byte[] body = Measurement.AltSerialize(m1); 
     //declare a queue if it doesn't exist 
     publishToQueue(m1.id); 

     channel.BasicPublish("amq.direct", m1.id, null, body); 
     Console.WriteLine("\n [x] Sent to queue {0}.", m1.id); 
    } 

    public void publishToQueue(string queueName) 
    { 
     string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null); 
     channel.QueueBind(finalQueueName, "amq.direct", "", null); 
    } 

    public Measurement readMessage() 
    { 
     Measurement m = new Measurement(); 
     foreach(Subscription element in subscriptions) 
     { 
      if(element.QueueName == null) 
      { 
       m = null; 
      } 
      else 
      { 
       BasicDeliverEventArgs ev = element.Next(); 
       if(ev != null) { 
        m = Measurement.AltDeSerialize(ev.Body); 
        m.id = element.QueueName; 
        element.Ack(); 
        return m; 
       } 
       m = null;      
      } 
      element.Ack(); 
     } 
     System.Console.WriteLine("No message in the queue(s) at this time."); 
     return m; 
    } 

    public void subscribeToQueue(string queueName) 
    { 
     sub = new Subscription(channel, queueName); 
     subscriptions.Add(sub); 
    } 

    public static string MsgSysName; 
    public string MsgSys 
    { 
     get 
     { 
      return MsgSysName; 
     } 
     set 
     { 
      MsgSysName = value; 
     } 
    } 

    public MessageHandler(string _msgSys) //Constructor 
    { 
     factory = new ConnectionFactory(); 
     factory.HostName = "localhost"; 
     connection = factory.CreateConnection(); 
     channel = connection.CreateModel(); 
     consumer = new QueueingBasicConsumer(channel); 

     System.Console.WriteLine("\nMsgSys: RabbitMQ"); 
     MsgSys = _msgSys; 
    } 

    public void disposeAll() 
    { 
     connection.Dispose(); 
     channel.Dispose(); 
     foreach(Subscription element in subscriptions) 
     { 
      element.Close(); 
     } 
     System.Console.WriteLine("\nDestroying RABBIT"); 
    } 
} 

//App1.cs

using System; 
using System.IO; 

using UtilityMeasurement; 
using UtilityMessageBus; 


public class MainClass 
{ 
    public static void Main() 
    { 

    MessageHandler obj1 = MessageHandler("Rabbit"); 

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName); 

    //Create new Measurement messages 
    Measurement m1 = new Measurement("q1", 2345, 23.456); 
    Measurement m2 = new Measurement("q2", 222, 33.33); 

    System.Console.WriteLine("Test message 1:\n ID: {0}", m1.id); 
    System.Console.WriteLine(" Time: {0}", m1.time); 
    System.Console.WriteLine(" Value: {0}", m1.value); 

    System.Console.WriteLine("Test message 2:\n ID: {0}", m2.id); 
    System.Console.WriteLine(" Time: {0}", m2.time); 
    System.Console.WriteLine(" Value: {0}", m2.value); 

    // Ask queue name and store it 
    System.Console.WriteLine("\nName of queue to publish to: "); 
    string queueName = (System.Console.ReadLine()).ToString(); 
    obj1.publishToQueue(queueName); 

    // Write message to the queue 
    obj1.writeMessage(m1);  

    System.Console.WriteLine("\nName of queue to publish to: "); 
    string queueName2 = (System.Console.ReadLine()).ToString(); 
    obj1.publishToQueue(queueName2); 

    obj1.writeMessage(m2); 

    obj1.disposeAll(); 
} 
} 

//App2.cs

using System; 
using System.IO; 

using UtilityMeasurement; 
using UtilityMessageBus; 

public class MainClass 
{ 
    public static void Main() 
    { 
    //Asks for the message system 
    System.Console.WriteLine("\nEnter name of messageing system: "); 
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]"); 
    string MsgSysName = (System.Console.ReadLine()).ToString(); 

    //Declare an IMessageBus instance: 
    //Here, an object of the corresponding Message System 
     // (ex. Rabbit, Zmq, etc) is instantiated 
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName); 

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName); 

    //Create a new Measurement object m 
    Measurement m = new Measurement(); 

    System.Console.WriteLine("Queue name to subscribe to: "); 
    string QueueName1 = (System.Console.ReadLine()).ToString(); 
    obj1.subscribeToQueue(QueueName1); 

    //Read message into m 
    m = obj1.readMessage(); 

    if (m != null) { 
     System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id); 
     System.Console.WriteLine(" Time: {0}", m.time); 
     System.Console.WriteLine(" Value: {0}", m.value); 
    } 

    System.Console.WriteLine("Another queue name to subscribe to: "); 
    string QueueName2 = (System.Console.ReadLine()).ToString(); 
    obj1.subscribeToQueue(QueueName2); 

    m = obj1.readMessage(); 

    if (m != null) { 
     System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id); 
     System.Console.WriteLine(" Time: {0}", m.time); 
     System.Console.WriteLine(" Value: {0}", m.value); 
    } 

    obj1.disposeAll(); 
} 
} 

Trả lời

12

hai nguồn thông tin:

  1. http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

  2. Bạn thực sự nên cố gắng tìm hiểu các ví dụ đầu tiên.

    • % Program Files% \ RabbitMQ \ DotNetClient \ ví dụ \ src (ví dụ cơ bản)

    • lấy ví dụ làm việc đầy đủ từ kho Mercurial của họ (C# dự án).

hoạt động hữu ích để hiểu:

  • Declare/Khẳng định/Nghe/Theo dõi/Xuất bản

Re: câu hỏi của bạn - không có lý do tại sao bạn có thể' t có nhiều danh sách. Hoặc bạn có thể đăng ký để n đường dẫn định tuyến với một người nghe trên một "trao đổi".

** lại: không chặn **

Một người nghe điển hình tiêu thụ từng thư một. Bạn có thể kéo chúng ra khỏi hàng đợi, hoặc chúng sẽ tự động được đặt gần với người tiêu dùng theo kiểu 'cửa sổ' (được xác định thông qua chất lượng của các tham số qos dịch vụ). Vẻ đẹp của phương pháp tiếp cận là rất nhiều công việc khó khăn được thực hiện cho bạn (lại: độ tin cậy, đảm bảo giao hàng, vv).

Một tính năng chính của RabbitMQ, là nếu có lỗi trong quá trình xử lý, thì thông báo sẽ được thêm lại vào hàng đợi (tính năng dung sai lỗi).

Cần biết thêm về tình huống của bạn.

Thường thì nếu bạn đăng lên danh sách mà tôi đã đề cập ở trên, bạn có thể nắm giữ ai đó trong nhân viên tại RabbitMQ. Chúng rất hữu ích.

Hy vọng rằng sẽ giúp ích một chút. Đầu tiên, đầu tiên của bạn là rất nhiều, nhưng nó rất đáng để tồn tại.


Q & Một

see: http://www.rabbitmq.com/faq.html

Q. Bạn có thể đăng ký nhiều hàng đợi sử dụng đăng ký mới (kênh, queueName)?

Có. Bạn có thể sử dụng khóa ràng buộc, ví dụ: abc. *. hij, hoặc abc. #. hij, hoặc bạn đính kèm nhiều ràng buộc. Trước đây, giả định rằng bạn đã thiết kế các khóa định tuyến của mình xung quanh một số nguyên tắc có ý nghĩa đối với bạn (xem các phím định tuyến trong Câu hỏi thường gặp). Đối với sau này, bạn cần phải liên kết với nhiều hơn một hàng đợi.

Thực hiện n-bindings theo cách thủ công. xem: http://hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

không có nhiều mã đằng sau mẫu này, vì vậy bạn có thể cuộn mẫu đăng ký của riêng mình nếu ký tự đại diện không đủ. bạn có thể kế thừa từ lớp này và thêm một phương thức khác cho các ràng buộc bổ sung ... có lẽ điều này sẽ làm việc hoặc một cái gì đó gần với điều này (chưa được kiểm tra).

Các AQMP spec nói rằng nhiều nhãn ràng buộc có thể xảy ra: http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind

Q. And if so, how can I go through all the subscribed queues and return a message (null when no messages)?

Với một thuê bao bạn được thông báo khi một tin nhắn có sẵn. Nếu không, những gì bạn mô tả là một giao diện kéo nơi bạn kéo tin nhắn xuống theo yêu cầu. Nếu không có thông báo nào, bạn sẽ nhận được một giá trị rỗng như bạn muốn. btw: phương thức Notify có lẽ thuận tiện hơn.

Q. Oh, and mind you that that I have all this operations in different methods. I will edit my post to reflect the code

đang Live:

phiên bản này phải sử dụng thẻ hoang dã để đăng ký nhiều hơn một tuyến quan trọng

n tay phím định tuyến sử dụng thuê bao là trái như một bài tập cho người đọc. ;-) Tôi nghĩ rằng bạn đang nghiêng về phía một giao diện kéo anyway. btw: các giao diện kéo kém hiệu quả hơn các giao diện thông báo.

 using (Subscription sub = new Subscription(ch, QueueNme)) 
     { 
      foreach (BasicDeliverEventArgs ev in sub) 
      { 
       Process(ev.Body); 

     ... 

Lưu ý: foreach sử dụng IEnumerable, và IEnumerable kết thúc tốt đẹp các sự kiện mà một tin nhắn mới đã đến thông qua "năng suất" tuyên bố. Hiệu quả nó là một vòng lặp vô hạn.

--- CẬP NHẬT

AMQP được thiết kế với ý tưởng của việc giữ số lượng kết nối TCP thấp như số lượng các ứng dụng, do đó có nghĩa bạn có thể có nhiều kênh cho mỗi kết nối.

mã trong câu hỏi này (chỉnh sửa 3) cố gắng sử dụng hai người đăng ký với một kênh, trong khi đó (tôi tin), là một người đăng ký trên mỗi kênh cho mỗi chủ đề để tránh các vấn đề về khóa. Sugestion: sử dụng khóa định tuyến "ký tự đại diện". Có thể đăng ký nhiều hơn một tên hàng đợi riêng biệt với máy khách java, nhưng máy khách .net không có kiến ​​thức của tôi đã thực hiện điều này trong lớp trình trợ giúp người đăng ký.

Nếu bạn thực sự cần hai tên hàng đợi riêng biệt trên cùng một chuỗi đăng ký, thì trình tự kéo sau đây được đề xuất cho.net:

 using (IModel ch = conn.CreateModel()) { // btw: no reason to close the channel afterwards IMO 
      conn.AutoClose = true;     // no reason to closs the connection either. Here for completeness. 

      ch.QueueDeclare(queueName); 
      BasicGetResult result = ch.BasicGet(queueName, false); 
      if (result == null) { 
       Console.WriteLine("No message available."); 
      } else { 
       ch.BasicAck(result.DeliveryTag, false); 
       Console.WriteLine("Message:"); 
      } 

      return 0; 
     } 

- CẬP NHẬT 2:.

từ danh sách RabbitMQ:

"cho rằng element.Next() được chặn trên một trong các mục đăng ký Bạn có thể lấy việc giao hàng từ mỗi thuê bao với một thời gian chờ để đọc qua nó. Hoặc bạn có thể thiết lập một hàng đợi duy nhất để nhận tất cả các phép đo và truy xuất thư từ đó bằng một đăng ký duy nhất. " (Emile)

Điều đó có nghĩa là khi hàng đợi đầu tiên trống, các khối .Next() đang chờ thông báo tiếp theo xuất hiện. ví dụ: các thuê bao có một chờ đợi-cho-next-tin được xây dựng trong

- CẬP NHẬT 3:.

dưới .net, sử dụng QueueingBasicConsumer phục vụ tiêu dùng từ nhiều hàng đợi.

Trên thực tế đây là một chủ đề về nó để có được một cảm giác về cách sử dụng:

Wait for a single RabbitMQ message with a timeout

- UPDATE4:

một số thông tin thêm về các .QueueingBasicConsumer

Có dụ mã ở đây.

http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html

dụ sao chép vào câu trả lời với một vài sửa đổi (xem // < -----).

   IModel channel = ...; 
      QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); 
      channel.BasicConsume(queueName, false, null, consumer); //<----- 
      channel.BasicConsume(queueName2, false, null, consumer); //<----- 
      // etc. channel.BasicConsume(queueNameN, false, null, consumer); //<----- 

      // At this point, messages will be being asynchronously delivered, 
      // and will be queueing up in consumer.Queue. 

      while (true) { 
       try { 
        BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue(); 
        // ... handle the delivery ... 
        channel.BasicAck(e.DeliveryTag, false); 
       } catch (EndOfStreamException ex) { 
        // The consumer was cancelled, the model closed, or the 
        // connection went away. 
        break; 
       } 
      } 

- CẬP NHẬT 5: hành động đơn giản sẽ hoạt động trên bất kỳ hàng đợi nào (phương thức chậm hơn nhưng đôi khi thuận tiện hơn).

  ch.QueueDeclare(queueName); 
      BasicGetResult result = ch.BasicGet(queueName, false); 
      if (result == null) { 
       Console.WriteLine("No message available."); 
      } else { 
       ch.BasicAck(result.DeliveryTag, false); 
       Console.WriteLine("Message:"); 
       // deserialize body and display extra info here. 
      } 
+0

Cảm ơn rất nhiều phản hồi của bạn. Tôi vẫn đang học hệ thống tin nhắn và có những hoạt động mà tôi vẫn chưa hiểu. Thích nghe. Tôi cũng đã xem cách thỏmq đăng ký hàng đợi. Bạn có thể đăng ký nhiều hàng đợi bằng cách sử dụng Đăng ký mới (channel, queueName) không? Và nếu vậy, làm thế nào tôi có thể đi qua tất cả các hàng đợi đăng ký và trả lại một tin nhắn (null khi không có tin nhắn)? Oh, và nhớ bạn rằng tôi có tất cả các hoạt động này trong các phương pháp khác nhau. Tôi sẽ chỉnh sửa bài đăng của mình để phản ánh mã. – Demi

+0

Cảm ơn bạn lần nữa. Tôi đã chỉnh sửa mã cho các chức năng đăng ký và viết ở trên. Tuy nhiên, tôi có lỗi thời gian chạy này: nếu tôi đăng ký để nói hai hàng đợi và cố gắng đọc tin nhắn tôi chỉ có thể truy xuất lại tin nhắn lần đầu tiên. Tôi không thể nhìn thấy nơi tôi đã làm hỏng nó. Bạn có thể xem nếu tôi? – Demi

+0

@Demi ... có một số săn bắn. Tôi nghĩ bạn đang bỏ lỡ "subscriptions.Ack()" ở cuối vòng đọc của bạn? Điều đó có nghĩa là 'Tôi đã xử lý thành công thông báo này, vì vậy hãy cho tôi thư tiếp theo.' Hãy cho tôi biết nếu đó là nó. Nếu không, bạn nhìn gần. – sgtz

1

Cách dễ nhất là sử dụng EventingBasicConsumer. Tôi có một ví dụ trên trang web của tôi về cách sử dụng nó. RabbitMQ EventingBasicConsumer

Lớp người tiêu dùng này cho thấy sự kiện đã nhận mà bạn có thể sử dụng và do đó KHÔNG chặn. Phần còn lại của mã về cơ bản vẫn như cũ.

Các vấn đề liên quan