隐藏

在C#中四种模式使用RabbitMQ基本连接方式Dome

发布:2023/1/9 9:53:27作者:管理员 来源:本站 浏览次数:337

1.先部署好RabbitMQ


2.进入登录页面


3.创建虚拟机


4.添加新用户


5.新用户绑定虚拟机


6.代码创建与rabbitmq链接


需下载引用包




    public class RabbitMQHelper

       {

           /// <summary>

           /// 获取RabbitMQ连接对象方法(创建与RabbitMQ的连接)

           /// </summary>

           /// <returns></returns>

           public static IConnection GetConnection() {

               //创建连接工厂【设置相关属性】

               var connectionFactory = new ConnectionFactory()

               {

                   //设置IP

                   HostName = "127.0.0.1",//RabbitMQ地址

                   Port = 5672,//端口

                   VirtualHost = "/test",//RabbitMQ中要请求的VirtualHost名称

                   UserName ="test",//RabbitMQ用户

                   Password= "test"//RabbitMQ用户密码

   

               };

               //通过工厂创建连接对象

               return connectionFactory.CreateConnection();

   

           }

       }


7.简单模式


生产者


   /// <summary>

       /// 简单队列模式-生产者

       /// </summary>

       public class SampleProducer

       {

           /// <summary>

           /// 简单队列模式

           /// 生产者

           /// 通过管道向RabbitMQ发送消息

           /// </summary>

           public void SendMessage() {

               //获取连接对象

               using var connection=RabbitMQHelper.GetConnection();

               //创建管道

               using var channel= connection.CreateModel();

               //创建队列

               channel.QueueDeclare("sample_queue",false,false,false,null);

   

               for (int i = 0; i < 5; i++)

               {

                   string msg = $"Hello Word Message{i + 1}";

                   var body = Encoding.UTF8.GetBytes(msg);

                   //向RabbitMQ发送消息

                   //routingKey指定发往哪个队列(如routingkey和队列名相同则为默认队列)

                   //exchange为空则默认链接的交换机, false,null, body为消息主体

                   channel.BasicPublish("", "sample_queue", false, null, body);

                   //输出发送的消息

                   Console.WriteLine(msg);

               }

           }

       }


消费者


    /// <summary>

       /// 简单队列模式-消费者

       /// </summary>

       public class SampleConsumer

       {

           /// <summary>

           /// 简单队列模式

           /// 消费者

           /// 通过管道向RabbitMQ获取消息且消费消息

           /// </summary>

           public static void ConsumerMessage() {

               //获取连接对象

               var connection=RabbitMQHelper.GetConnection();

               //创建管道

               var chennel=connection.CreateModel();

               //创建队列(如队列已存在的情况可不用再次创建/此创建为:确保先开启消费者,生产者未创建队列而引发报错)

               chennel.QueueDeclare("sample_queue", false, false, false, null);

   

               //事件对象

              var consumer= new EventingBasicConsumer(chennel);

               consumer.Received += (model, ea) => {

                   //获取消息

                   var body=ea.Body;//获取队列中消息主体

                   var msg=Encoding.UTF8.GetString(body.ToArray());//转为string

                   var routingkey=ea.RoutingKey;//队列名

                   Console.WriteLine($"message is:{msg} ,routingkey => {routingkey}");

               };

               //消费消息

               chennel.BasicConsume("sample_queue", true, consumer);

   

           }

       }


8.发布订阅模式


生产者


    /// <summary>

       /// 发布订阅模式-生产者

       /// </summary>

       public class FanoutProducer

       {

           /// <summary>

           /// 发布订阅模式

           /// 生产者

           /// 设置交换机,创建多个队列,队列订阅交换机,生产者往交换机发送消息即可

           /// </summary>

           public void SendMessage() {

               //获取连接对象

               using var connection = RabbitMQHelper.GetConnection();

               //创建管道

               using var channel = connection.CreateModel();

               //创建交换机

               channel.ExchangeDeclare("fanout_Exchange","fanout",false,false,null);

               //创建队列

               channel.QueueDeclare("fanout_queue1",false,false,false,null);

               channel.QueueDeclare("fanout_queue2", false, false, false, null);

               channel.QueueDeclare("fanout_queue3", false, false, false, null);

               //把队列绑定到交换机(就是队列订阅交换机)

               channel.QueueBind("fanout_queue1", "fanout_Exchange", "",null);

               channel.QueueBind("fanout_queue2", "fanout_Exchange", "", null);

               channel.QueueBind("fanout_queue3", "fanout_Exchange", "", null);

   

               for (int i = 0; i < 10; i++)

               {

                   string msg = $"RabbiteMQ fanout Message{i+1}";

                   var body=Encoding.UTF8.GetBytes(msg);

                   //发送消息

                   //创建了交换机则不需要routingkey了,因为只需要队列订阅交换机即可

                   //消息发送到交换机,创建了的队列订阅了交换机则会自动去交换机拿值

                   channel.BasicPublish("fanout_Exchange","",false,null,body);

                   Console.WriteLine(msg);

               }

           }

       }


消费者


   /// <summary>

       /// 发布订阅模式-消费者

       /// </summary>

       public class FanoutConsumer

       {

           /// <summary>

           /// 发布订阅模式

           /// 消费者

           /// 通过管道向RabbitMQ获取消息且消费消息

           /// </summary>

           public static void ConsumerMessage()

           {

               //获取连接对象

               var connection = RabbitMQHelper.GetConnection();

               //创建管道

               var channel = connection.CreateModel();

   

               /*

                * 在消费者创建队列/交换机是以防范先启动消费者而引发报错

                * (如队列已存在的情况可不用再次创建/此创建为:确保先开启消费者,生产者未创建队列而引发报错)

               */

   

               //创建交换机

               channel.ExchangeDeclare("fanout_Exchange", "fanout", false, false, null);

               //创建队列

               channel.QueueDeclare("fanout_queue1", false, false, false, null);

               channel.QueueDeclare("fanout_queue2", false, false, false, null);

               channel.QueueDeclare("fanout_queue3", false, false, false, null);

               //把队列绑定到交换机(就是队列订阅交换机)

               channel.QueueBind("fanout_queue1", "fanout_Exchange", "", null);

               channel.QueueBind("fanout_queue2", "fanout_Exchange", "", null);

               channel.QueueBind("fanout_queue3", "fanout_Exchange", "", null);

   

               //事件对象

               var consumer = new EventingBasicConsumer(channel);

               consumer.Received += (model, ea) => {

                   //获取消息

                   var body = ea.Body;//获取队列中消息主体

                   var msg = Encoding.UTF8.GetString(body.ToArray());//转为string

                   var routingkey = ea.RoutingKey;//队列名

                   Console.WriteLine($"message is:{msg} ,routingkey => {routingkey}");

               };

               //消费消息

               //可一次消费多个队列消息(也可消费单个队列)

               channel.BasicConsume("fanout_queue1", true, consumer);

               channel.BasicConsume("fanout_queue2", true, consumer);

               channel.BasicConsume("fanout_queue3", true, consumer);

           }

       }


9.路由模式


生产者


    /// <summary>

       /// 路由模式-生产者

       /// </summary>

      public class DirectProducer

       {

           /// <summary>

           /// 路由模式

           /// 生产者

           /// 设置交换机,创建多个队列,队列订阅交换机,生产者往交换机发送消息即可

           /// 给队列设置指定routingkey,发送消息时可通过订阅了交换机队列的routingkry指定发送给某个队列

           /// </summary>

           public void SendMessage()

           {

               //获取连接对象

               using var connection = RabbitMQHelper.GetConnection();

               //创建管道

               using var channel = connection.CreateModel();

               //创建交换机

               channel.ExchangeDeclare("direct_Exchange", "direct", false, false, null);

               //创建队列

               channel.QueueDeclare("direct_queue1", false, false, false, null);

               channel.QueueDeclare("direct_queue2", false, false, false, null);

               channel.QueueDeclare("direct_queue3", false, false, false, null);

   

               //把队列绑定到交换机(就是队列订阅交换机)

               channel.QueueBind("direct_queue1", "direct_Exchange", "info", null);//设置routingkey(可不同的消息发往不同的队列)  info:为正常的消息

               channel.QueueBind("direct_queue2", "direct_Exchange", "warn", null);//warn:非正常的消息

               channel.QueueBind("direct_queue3", "direct_Exchange", "error", null);//error:错误的消息

   

               for (int i = 0; i < 10; i++)

               {

                   string msg = $"RabbiteMQ direct Message{i + 1}";

                   var body = Encoding.UTF8.GetBytes(msg);

                   //发送消息

                   //根据已订阅交换机队列的routingkey指定发送消息

                   channel.BasicPublish("direct_Exchange", "info", false, null, body);

                   Console.WriteLine(msg);

               }

           }

       }


消费者


   /// <summary>

       /// 路由模式-消费者

       /// </summary>

      public class DirectConsumer

       {

           /// <summary>

           /// 路由模式-消费者

           /// 消费者

           /// 通过管道向RabbitMQ获取消息且消费消息

           /// </summary>

           public static void ConsumerMessage()

           {

               //获取连接对象

               var connection = RabbitMQHelper.GetConnection();

               //创建管道

               var channel = connection.CreateModel();

   

               /*

                * 在消费者创建队列/交换机是以防范先启动消费者而引发报错

                * (如队列已存在的情况可不用再次创建/此创建为:确保先开启消费者,生产者未创建队列而引发报错)

               */

   

               //创建交换机

               channel.ExchangeDeclare("direct_Exchange", "direct", false, false, null);

               //创建队列

               channel.QueueDeclare("direct_queue1", false, false, false, null);

               channel.QueueDeclare("direct_queue2", false, false, false, null);

               channel.QueueDeclare("direct_queue3", false, false, false, null);

               //把队列绑定到交换机(就是队列订阅交换机)

               channel.QueueBind("direct_queue1", "direct_Exchange", "info", null);

               channel.QueueBind("direct_queue2", "direct_Exchange", "warn", null);

               channel.QueueBind("direct_queue3", "direct_Exchange", "error", null);

   

               //事件对象

               var consumer = new EventingBasicConsumer(channel);

               consumer.Received += (model, ea) => {

                   //获取消息

                   var body = ea.Body;//获取队列中消息主体

                   var msg = Encoding.UTF8.GetString(body.ToArray());//转为string

                   var routingkey = ea.RoutingKey;//队列名

                   Console.WriteLine($"message is:{msg} ,routingkey => {routingkey}");

               };

   

               //消费消息

               //可一次消费多个队列消息(也可消费单个队列)

               channel.BasicConsume("direct_queue1", true, consumer);

               channel.BasicConsume("direct_queue2", true, consumer);

               channel.BasicConsume("direct_queue3", true, consumer);

           }

       }


10.主题模式


生产者


/// <summary>

   /// 主题模式-生产者

   /// </summary>

  public class TopicProducer

   {

       /// <summary>

       /// 主题模式

       /// 生产者

       /// 设置交换机,创建多个队列,队列订阅交换机,生产者往交换机发送消息即可

       /// 给队列设置指定routingkey,发送消息时可通过订阅了交换机队列的routingkry指定发送给某个队列

       /// </summary>

       public void SendMessage()

       {

           //获取连接对象

           using var connection = RabbitMQHelper.GetConnection();

           //创建管道

           using var channel = connection.CreateModel();

           //创建交换机

           channel.ExchangeDeclare("topic_Exchange", "topic", false, false, null);

           //创建队列

           channel.QueueDeclare("topic_queue1", false, false, false, null);

           channel.QueueDeclare("topic_queue2", false, false, false, null);

           channel.QueueDeclare("topic_queue3", false, false, false, null);


           //把队列绑定到交换机(就是队列订阅交换机)

           channel.QueueBind("topic_queue1", "topic_Exchange", "user.insert", null);//设置routingkey(可不同的消息发往不同的队列)  info:为正常的消息

           channel.QueueBind("topic_queue2", "topic_Exchange", "user.update", null);//warn:非正常的消息

           //通配符: * 代表随意一个单词  # 代表任意组合词汇

           //(在添加其它队列时,符合通配符条件则这个队列消息也会添加进去)

           channel.QueueBind("topic_queue3", "topic_Exchange", "user.*", null);//error:错误的消息


           for (int i = 0; i < 10; i++)

           {

               string msg = $"RabbiteMQ topic Message{i + 1}";

               var body = Encoding.UTF8.GetBytes(msg);

               //发送消息

               //根据已订阅交换机队列的routingkey指定发送消息

               channel.BasicPublish("topic_Exchange", "user.update", false, null, body);

               Console.WriteLine(msg);

           }

       }

   }