Tôi mới sử dụng RabbitMQ. Tôi muốn có thể xử lý các tin nhắn đọc mà không bị chặn khi có nhiều hàng đợi (để đọc từ). Bất kỳ đầu vào về cách tôi có thể làm điều đó?Đọc từ nhiều hàng đợi, RabbitMQ
// Sửa 1
public class Rabbit : IMessageBus
{
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();
ConnectionFactory factory = null;
IConnection connection = null;
IModel channel = null;
Subscription sub = null;
public void writeMessage(Measurement m1) {
byte[] body = Measurement.AltSerialize(m1);
int msgCount = 1;
Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id);
string finalQueue = publishToQueue(m1.id);
while (msgCount --> 0) {
channel.BasicPublish("amq.direct", finalQueue, null, body);
}
Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id);
}
public string publishToQueue(string firstQueueName) {
Console.WriteLine("Creating a queue and binding it to amq.direct");
string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null);
channel.QueueBind(queueName, "amq.direct", queueName, null);
Console.WriteLine("Done. Created queue {0} and bound it to amq.direct.\n", queueName);
return queueName;
}
public Measurement readMessage() {
Console.WriteLine("Receiving message...");
Measurement m = new Measurement();
int i = 0;
foreach (BasicDeliverEventArgs ev in sub) {
m = Measurement.AltDeSerialize(ev.Body);
//m.id = //get the id here, from sub
if (++i == 1)
break;
sub.Ack();
}
Console.WriteLine("Done.\n");
return m;
}
public void subscribeToQueue(string queueName)
{
sub = new Subscription(channel, queueName);
}
public static string MsgSysName;
public string MsgSys
{
get
{
return MsgSysName;
}
set
{
MsgSysName = value;
}
}
public Rabbit(string _msgSys) //Constructor
{
factory = new ConnectionFactory();
factory.HostName = "localhost";
connection = factory.CreateConnection();
channel = connection.CreateModel();
//consumer = new QueueingBasicConsumer(channel);
System.Console.WriteLine("\nMsgSys: RabbitMQ");
MsgSys = _msgSys;
}
~Rabbit()
{
//observer??
connection.Dispose();
//channel.Dispose();
System.Console.WriteLine("\nDestroying RABBIT");
}
}
// Sửa 2
private List<Subscription> subscriptions = new List<Subscription>();
Subscription sub = null;
public Measurement readMessage()
{
Measurement m = new Measurement();
foreach(Subscription element in subscriptions)
{
foreach (BasicDeliverEventArgs ev in element) {
//ev = element.Next();
if(ev != null) {
m = Measurement.AltDeSerialize(ev.Body);
return m;
}
m = null;
}
}
System.Console.WriteLine("No message in the queue(s) at this time.");
return m;
}
public void subscribeToQueue(string queueName)
{
sub = new Subscription(channel, queueName);
subscriptions.Add(sub);
}
// Sửa 3
//MessageHandler.cs
public class MessageHandler
{
// Implementation of methods for Rabbit class go here
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();
ConnectionFactory factory = null;
IConnection connection = null;
IModel channel = null;
QueueingBasicConsumer consumer = null;
private List<Subscription> subscriptions = new List<Subscription>();
Subscription sub = null;
public void writeMessage (Measurement m1)
{
byte[] body = Measurement.AltSerialize(m1);
//declare a queue if it doesn't exist
publishToQueue(m1.id);
channel.BasicPublish("amq.direct", m1.id, null, body);
Console.WriteLine("\n [x] Sent to queue {0}.", m1.id);
}
public void publishToQueue(string queueName)
{
string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null);
channel.QueueBind(finalQueueName, "amq.direct", "", null);
}
public Measurement readMessage()
{
Measurement m = new Measurement();
foreach(Subscription element in subscriptions)
{
if(element.QueueName == null)
{
m = null;
}
else
{
BasicDeliverEventArgs ev = element.Next();
if(ev != null) {
m = Measurement.AltDeSerialize(ev.Body);
m.id = element.QueueName;
element.Ack();
return m;
}
m = null;
}
element.Ack();
}
System.Console.WriteLine("No message in the queue(s) at this time.");
return m;
}
public void subscribeToQueue(string queueName)
{
sub = new Subscription(channel, queueName);
subscriptions.Add(sub);
}
public static string MsgSysName;
public string MsgSys
{
get
{
return MsgSysName;
}
set
{
MsgSysName = value;
}
}
public MessageHandler(string _msgSys) //Constructor
{
factory = new ConnectionFactory();
factory.HostName = "localhost";
connection = factory.CreateConnection();
channel = connection.CreateModel();
consumer = new QueueingBasicConsumer(channel);
System.Console.WriteLine("\nMsgSys: RabbitMQ");
MsgSys = _msgSys;
}
public void disposeAll()
{
connection.Dispose();
channel.Dispose();
foreach(Subscription element in subscriptions)
{
element.Close();
}
System.Console.WriteLine("\nDestroying RABBIT");
}
}
//App1.cs
using System;
using System.IO;
using UtilityMeasurement;
using UtilityMessageBus;
public class MainClass
{
public static void Main()
{
MessageHandler obj1 = MessageHandler("Rabbit");
System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);
//Create new Measurement messages
Measurement m1 = new Measurement("q1", 2345, 23.456);
Measurement m2 = new Measurement("q2", 222, 33.33);
System.Console.WriteLine("Test message 1:\n ID: {0}", m1.id);
System.Console.WriteLine(" Time: {0}", m1.time);
System.Console.WriteLine(" Value: {0}", m1.value);
System.Console.WriteLine("Test message 2:\n ID: {0}", m2.id);
System.Console.WriteLine(" Time: {0}", m2.time);
System.Console.WriteLine(" Value: {0}", m2.value);
// Ask queue name and store it
System.Console.WriteLine("\nName of queue to publish to: ");
string queueName = (System.Console.ReadLine()).ToString();
obj1.publishToQueue(queueName);
// Write message to the queue
obj1.writeMessage(m1);
System.Console.WriteLine("\nName of queue to publish to: ");
string queueName2 = (System.Console.ReadLine()).ToString();
obj1.publishToQueue(queueName2);
obj1.writeMessage(m2);
obj1.disposeAll();
}
}
//App2.cs
using System;
using System.IO;
using UtilityMeasurement;
using UtilityMessageBus;
public class MainClass
{
public static void Main()
{
//Asks for the message system
System.Console.WriteLine("\nEnter name of messageing system: ");
System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
string MsgSysName = (System.Console.ReadLine()).ToString();
//Declare an IMessageBus instance:
//Here, an object of the corresponding Message System
// (ex. Rabbit, Zmq, etc) is instantiated
IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);
System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);
//Create a new Measurement object m
Measurement m = new Measurement();
System.Console.WriteLine("Queue name to subscribe to: ");
string QueueName1 = (System.Console.ReadLine()).ToString();
obj1.subscribeToQueue(QueueName1);
//Read message into m
m = obj1.readMessage();
if (m != null) {
System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id);
System.Console.WriteLine(" Time: {0}", m.time);
System.Console.WriteLine(" Value: {0}", m.value);
}
System.Console.WriteLine("Another queue name to subscribe to: ");
string QueueName2 = (System.Console.ReadLine()).ToString();
obj1.subscribeToQueue(QueueName2);
m = obj1.readMessage();
if (m != null) {
System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id);
System.Console.WriteLine(" Time: {0}", m.time);
System.Console.WriteLine(" Value: {0}", m.value);
}
obj1.disposeAll();
}
}
Cảm ơn rất nhiều phản hồi của bạn. Tôi vẫn đang học hệ thống tin nhắn và có những hoạt động mà tôi vẫn chưa hiểu. Thích nghe. Tôi cũng đã xem cách thỏmq đăng ký hàng đợi. Bạn có thể đăng ký nhiều hàng đợi bằng cách sử dụng Đăng ký mới (channel, queueName) không? Và nếu vậy, làm thế nào tôi có thể đi qua tất cả các hàng đợi đăng ký và trả lại một tin nhắn (null khi không có tin nhắn)? Oh, và nhớ bạn rằng tôi có tất cả các hoạt động này trong các phương pháp khác nhau. Tôi sẽ chỉnh sửa bài đăng của mình để phản ánh mã. – Demi
Cảm ơn bạn lần nữa. Tôi đã chỉnh sửa mã cho các chức năng đăng ký và viết ở trên. Tuy nhiên, tôi có lỗi thời gian chạy này: nếu tôi đăng ký để nói hai hàng đợi và cố gắng đọc tin nhắn tôi chỉ có thể truy xuất lại tin nhắn lần đầu tiên. Tôi không thể nhìn thấy nơi tôi đã làm hỏng nó. Bạn có thể xem nếu tôi? – Demi
@Demi ... có một số săn bắn. Tôi nghĩ bạn đang bỏ lỡ "subscriptions.Ack()" ở cuối vòng đọc của bạn? Điều đó có nghĩa là 'Tôi đã xử lý thành công thông báo này, vì vậy hãy cho tôi thư tiếp theo.' Hãy cho tôi biết nếu đó là nó. Nếu không, bạn nhìn gần. – sgtz