隐藏

asp.net c# 通过消息队列处理高并发请求(以抢手机为例)

发布:2021/11/6 11:35:01作者:管理员 来源:本站 浏览次数:633



原文地址:https://www.cnblogs.com/chenxizhaolu/p/12543376.html


网站面对高并发的情况下,除了增加硬件, 优化程序提高以响应速度外,还可以通过并行改串行的思路来解决。这种思想常见的实践方式就是数据库锁和消息队列的方式。这种方式的缺点是需要排队,响应速度慢,优点是节省成本。

演示一下现象


创建一个在售产品表

复制代码

复制代码


CREATE TABLE [dbo].[product](

   [id] [int] NOT NULL,--唯一主键

   [name] [nvarchar](50) NULL,--产品名称

   [status] [int] NULL ,--0未售出  1 售出  默认为0

   [username] [nvarchar](50) NULL--下单用户

)


复制代码

复制代码


添加一条记录


insert into product(id,name,status,username) values(1,'小米手机',0,null)


创建一个抢票程序

复制代码

复制代码


public ContentResult PlaceOrder(string userName)

       {

           using (RuanMou2020Entities db = new RuanMou2020Entities())

           {

                   var product = db.product.Where<product>(p => p.status== 0).FirstOrDefault();

                   if (product.status == 1)

                   {

                       return Content("失败,产品已经被卖光");

                   }

                   else

                   {

                       //模拟数据库慢造成并发问题

                       Thread.Sleep(5000);

                       product.status = 1;

                       product.username= userName;

db.SaveChanges();

return Content("成功购买");

}

}

}


复制代码

复制代码


如果我们在5秒内一次访问以下两个地址,那么返回的结果都是成功购买且数据表中的username是lisi。


/controller/PlaceOrder?username=zhangsan


/controller/PlaceOrder?username=lisi


这就是并发带来的问题。

第一阶段,利用线程锁简单粗暴


Web程序是多线程的,那我们把他在容易出现并发的地方加一把锁就可以了,如下图处理方式。

复制代码

复制代码


       private static object _lock = new object();


       public ContentResult PlaceOrder(string userName)

       {

           using (RuanMou2020Entities db = new RuanMou2020Entities())

           {

               lock (_lock)

               {

                   var product = db.product.Where<product>(p => p.status == 0).FirstOrDefault();

                   if (product.status == 1)

                   {

                       return Content("失败,产品已经被卖光");

                   }

                   else

                   {

                       //模拟数据库慢造成并发问题

                       Thread.Sleep(5000);

                       product.status = 1;

                       product.username = userName;

                       db.SaveChanges();

                       return Content("成功购买");

                   }

               }

           }

       }


复制代码

复制代码


这样每一个请求都是依次执行,不会出现并发问题了。


优点:解决了并发的问题。


缺点:效率太慢,用户体验性太差,不适合大数据量场景。

第二阶段,拉消息队列,通过生产者,消费者的模式


1,创建订单提交入口(生产者)

复制代码

复制代码


public class HomeController : Controller

   {


       /// <summary>

       /// 接受订单提交(生产者)

       /// </summary>

       /// <returns></returns>

       public ContentResult PlaceOrderQueen(string userName)

       {

           //直接将请求写入到订单队列

           OrderConsumer.TicketOrders.Enqueue(userName);

           return Content("wait");

       }


       /// <summary>

       /// 查询订单结果

       /// </summary>

       /// <returns></returns>

       public ContentResult PlaceOrderQueenResult(string userName)

       {

           var rel = OrderConsumer.OrderResults.Where(p => p.userName == userName).FirstOrDefault();

           if (rel == null)

           {

               return Content("还在排队中");

           }

           else

           {

               return Content(rel.Result.ToString());

           }

       }

}


复制代码

复制代码




2,创建订单处理者(消费者)

复制代码

复制代码


/// <summary>

   /// 订单的处理者(消费者)

   /// </summary>

   public class OrderConsumer

   {

       /// <summary>

       /// 订票的消息队列

       /// </summary>

       public static ConcurrentQueue<string> TicketOrders = new ConcurrentQueue<string>();

       /// <summary>

       /// 订单结果消息队列

       /// </summary>

       public static List<OrderResult> OrderResults = new List<OrderResult>();

       /// <summary>

       /// 订单处理

       /// </summary>

       public static void StartTicketTask()

       {

           string userName = null;

           while (true)

           {

               //如果没有订单任务就休息1秒钟

               if (!TicketOrders.TryDequeue(out userName))

               {

                   Thread.Sleep(1000);

                   continue;

               }

               //执行真实的业务逻辑(如插入数据库)

               bool rel = new TicketHelper().PlaceOrderDataBase(userName);

               //将执行结果写入结果集合

               OrderResults.Add(new OrderResult() { Result = rel, userName = userName });

           }

       }

   }


复制代码

复制代码


3,创建订单业务的实际执行者

复制代码

复制代码


/// <summary>

   /// 订单业务的实际处理者

   /// </summary>

   public class TicketHelper

   {

       /// <summary>

       /// 实际库存标识

       /// </summary>

       private bool hasStock = true;

       /// <summary>

       /// 执行一个订单到数据库

       /// </summary>

       /// <returns></returns>

       public bool PlaceOrderDataBase(string userName)

       {

           //如果没有了库存,则直接返回false,防止频繁读库

           if (!hasStock)

           {

               return hasStock;

           }

           using (RuanMou2020Entities db = new RuanMou2020Entities())

           {

               var product = db.product.Where(p => p.status == 0).FirstOrDefault();

               if (product == null)

               {

                   hasStock = false;

                   return false;

               }

               else

               {

                   Thread.Sleep(10000);//模拟数据库的效率比较慢,执行插入时间比较久

                   product.status = 1;

                   product.username = userName;

                   db.SaveChanges();

                   return true;

               }

           }

       }

   }

   /// <summary>

   /// 订单处理结果实体

   /// </summary>

   public class OrderResult

   {

       public string userName { get; set; }

       public bool Result { get; set; }

   }


复制代码

复制代码


4,在程序启动前,启动消费者线程

复制代码

复制代码


protected void Application_Start()

       {

           AreaRegistration.RegisterAllAreas();

           GlobalConfiguration.Configure(WebApiConfig.Register);

           FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);

           RouteConfig.RegisterRoutes(RouteTable.Routes);

           BundleConfig.RegisterBundles(BundleTable.Bundles);


           //在Global的Application_Start事件里单独开启一个消费者线程

           Task.Run(OrderConsumer.StartTicketTask);

       }


复制代码

复制代码


这样程序的运行模式是:用户提交的需求里都会添加到消息队列里去排队处理,程序会依次处理该队列里的内容(当然可以一次取出多条来进行处理,提高效率)。


优点:比上一步快了。


缺点:不够快,而且下单后需要轮询另外一个接口判断是否成功。

第三阶段 反转生产者消费者的角色,把可售产品提前放到队列里,然后让提交的订单来消费队列里的内容


1,创建生产者并且在程序启动前调用其初始化程序

复制代码

复制代码


public class ProductForSaleManager

   {

       /// <summary>

       /// 待售商品队列

       /// </summary>

       public static ConcurrentQueue<int> ProductsForSale = new ConcurrentQueue<int>();

       /// <summary>

       /// 初始化待售商品队列

       /// </summary>

       public static void Init()

       {

           using (RuanMou2020Entities db = new RuanMou2020Entities())

           {

               db.product.Where(p => p.status == 0).Select(p => p.id).ToList().ForEach(p =>

               {

                   ProductsForSale.Enqueue(p);

               });

           }

       }

   }


复制代码

复制代码

复制代码

复制代码


public class MvcApplication : System.Web.HttpApplication

   {

       protected void Application_Start()

       {

           AreaRegistration.RegisterAllAreas();

           GlobalConfiguration.Configure(WebApiConfig.Register);

           FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);

           RouteConfig.RegisterRoutes(RouteTable.Routes);

           BundleConfig.RegisterBundles(BundleTable.Bundles);


           //程序启动前,先初始化待售产品消息队列

           ProductForSaleManager.Init();

       }

   }


复制代码

复制代码


2,创建消费者

复制代码

复制代码


public class OrderController : Controller

   {

       /// <summary>

       /// 下订单

       /// </summary>

       /// <param name="userName">订单提交者</param>

       /// <returns></returns>

       public async Task<ContentResult> PlaceOrder(string userName)

       {

           if (ProductForSaleManager.ProductsForSale.TryDequeue(out int pid))

           {

               await new TicketHelper2().PlaceOrderDataBase(userName, pid);

               return Content($"下单成功,对应产品id为:{pid}");

           }

           else

           {

               await Task.CompletedTask;

               return Content($"商品已经被抢光");

           }

       }

   }


复制代码

复制代码


3,当然还需要一个业务的实际执行者

复制代码

复制代码


/// <summary>

   /// 订单业务的实际处理者

   /// </summary>

   public class TicketHelper2

   {

       /// <summary>

       /// 执行复杂的订单操作(如数据库)

       /// </summary>

       /// <param name="userName">下单用户</param>

       /// <param name="pid">产品id</param>

       /// <returns></returns>

       public async Task PlaceOrderDataBase(string userName, int pid)

       {

           using (RuanMou2020Entities db = new RuanMou2020Entities())

           {

               var product = db.product.Where(p => p.id == pid).FirstOrDefault();

               if (product != null)

               {

                   product.status = 1;

                   product.username = userName;

                   await db.SaveChangesAsync();

               }

           }

       }

   }


复制代码

复制代码


这样我们同时访问下面三个地址,如果数据库里只有两个商品的话,会有一个请求结果为:商品已经被抢光。


http://localhost:88/Order/PlaceOrder?userName=zhangsan


http://localhost:88/Order/PlaceOrder?userName=lisi


http://localhost:88/Order/PlaceOrder?userName=wangwu


这种处理方式的优点为:执行效率快,相比第二种方式不需要第二个接口来返回查询结果。


缺点:暂时没想到,欢迎大家补充。


说明:该方式只是个人猜想,并非实际项目经验,大家只能作为参考,慎重用于项目。欢迎大家批评指正。