using RabbitMQ.Client;
namespace Produce
{
class Program
{
private const string QUEUE_NAME = "RabbitMQ_Test";
static void Main(string[] args)
{
Console.WriteLine("Produce client test:\r");
ProduceMessage(Console.ReadLine());
}
private static void ProduceMessage(string msg)
{
if (string.IsNullOrEmpty(msg)) return;
//Main entry point to the RabbitMQ .NET AMQP client API. Constructs IConnection instances.
//RabbitMQ主入口,用语构造IConnection实例
ConnectionFactory factory = new ConnectionFactory();
//使用默认用户名(guest)
//factory.UserName = ConnectionFactory.DefaultUser;
//使用默认密码(guest)
//factory.Password = ConnectionFactory.DefaultPass;
//使用默认端口
//factory.Port = AmqpTcpEndpoint.UseDefaultPort;
//使用默认协议
//factory.Protocol = Protocols.DefaultProtocol;
//使用默认服务器(本机服务器)
//factory.HostName = "127.0.0.1";
try
{
//创建到服务器的连接实例
using (IConnection conn = factory.CreateConnection())
{
Console.WriteLine("Connect server successfully.");
//创建一个新的Channel
using (IModel channel = conn.CreateModel())
{
channel.BasicPublish("",QUEUE_NAME, null, Encoding.Default.GetBytes(msg));
Console.WriteLine("Message publish successfully.");
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
ProduceMessage(Console.ReadLine());
}
}
}
}
消息接收端代码:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.IO;
namespace Consumer
{
class Program
{
private const string QUEUE_NAME = "RabbitMQ_Test";
private static bool noAck = true;
static void Main(string[] args)
{
Console.WriteLine("Consumer client test:\r\n");
//InitiativeMessage();
SubscriptionMessage();
}
#region
///<summary>
///主动获取队列消息
///</summary>
private static void InitiativeMessage()
{
//实例化一个使用默认参数的factory
ConnectionFactory factory = new ConnectionFactory();
try
{
using (IConnection conn = factory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
//获取单条消息
//SingleMessage(channel);
//获取队列中所有消息
MuliteMessage(channel);
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
Console.Read();
}
}
///<summary>
///获取单条消息
///</summary>
///<param name="channel"></param>
private static bool SingleMessage(IModel channel)
{
BasicGetResult result = channel.BasicGet(QUEUE_NAME, noAck/*接收消息后从队列中删除*/);
if (result == null)
{
Console.WriteLine("There is no message in queue");
return false;
}
else
{
Console.WriteLine("Message is: " + Encoding.Default.GetString(result.Body));
return true;
}
}
///<summary>
///获取所有队列消息
///</summary>
///<param name="channel"></param>
private static void MuliteMessage(IModel channel)
{
while (true)
{
if (!SingleMessage(channel))
{
break;
}
}
}
#endregion
#region
///<summary>
///订阅模式获取消息
///</summary>
private static void SubscriptionMessage()
{
ConnectionFactory factory = new ConnectionFactory();
try
{
using (IConnection conn = factory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QUEUE_NAME, false,consumer);
//消息将会异步传递
while (true)
{
try
{
BasicDeliverEventArgs e = consumer.Queue.Dequeue() as BasicDeliverEventArgs;
//Handle the delivery
Console.WriteLine("Received message: " + Encoding.Default.GetString(e.Body));
//将消息从队列中删除
channel.BasicAck(e.DeliveryTag, false);
}
catch (EndOfStreamException)
{
Console.WriteLine("The consumer was cancelled, the model closed, or the connection went away.");
break;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
break;
}
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
Console.Read();
}
}
#endregion
}
}
Really smart.Anyway i love to use twitter in my site.