隐藏

ASPNET.Core结合Redis实践消息队列,从此放心安全迭代

发布:2022/8/6 17:02:26作者:管理员 来源:本站 浏览次数:928


引言


熟悉TPL Dataflow博文的朋友可能记得这是个单体程序,使用TPL Dataflow 处理工作流任务, 在使用Docker部署的过程中, 有一个问题一直无法回避:


      在单体程序部署的瞬间(服务不可用)会有少量流量无法处理;更糟糕的情况下,迭代部署的这个版本有问题,上线后无法运作, 更多的流量没有得到处理。


     背负神圣使命(巨大压力)的程序猿心生一计, 为何不将单体程序改成分布式:增加服务ReceiverApp只接收数据,服务WebApp只处理数据。



知识储备


   消息队列和订阅发布作为老生常谈的两个知识点被反复提及,按照JMS的规范, 官方称为点对点(point to point,message queue) 和 订阅发布(publish/subscribe,channel / topic )


点对点


消息生产者生产消息发送到Message Queue中,然后消费者从队列中取出消息并消费。


   队列会保留消息,直到他们被消费或超时;


   ① MQ支持多消费者,每个消息只能被一个消费者处理


   ② 消息发送者和消费者在时间上没有依赖性,当发送者发送消息之后, 不管消费者有没有在运行(甚至不管有没有消费者),都不会影响到消息被发送到队列


   ③ 一般消费者在消费之后需要向队列应答成功


   如果希望发送的消息都被处理,或只能被处理一次,你应该使用p2p模型。


发布/订阅


消息生产者将消息发布到Channel,同时有多个消息消费者(订阅)该消息。和点对点方式不同,发布到 特定通道的消息会被通道订阅者实时接收。


   通道 只有暂存机制,发布的消息只能被当前订阅者收到。


   ①每个消息可以有多个消费者


   ②发布者和消费者 有时间上依赖性, 针对某topic的订阅者,必须先创建相应订阅,才能消费消息


   将消息发布到通道中,而不关注订阅者是谁;订阅者可收听自己感兴趣的多个通道(形成Topic), 也不关注发布者是谁。


   ③ 故如果没有消费者,发布的消息将得不到处理;


   如果希望广播的消息被实时接收,应该采用发布-订阅模型。



头脑风暴


Redis 内置的List数据结构亦能形成轻量级MQ的效果,Redis 原生支持发布/订阅 模型。


如上所述, Pub/Sub 模型 在订阅者宕机的时候,发布的消息得不到处理,故此模型不能用于高可靠性的的数据接收和处理。


本次采用的消息队列模型:


      解耦业务:  新建Receiver程序作为生产者,专注于接收并发送到队列;原有的webapp作为消费者专注数据处理。

      起到削峰填谷的作用, 若建立多个消费者webapp容器,还能形成负载均衡的效果。


   需要关注Redis 两个命令( 左进右出,右进左出同理):


   LPUSH  &  RPOP/BRPOP


         明显的思路是 lpush, rpop,但是如果队列空了,消费者会陷入pop死循环,即使没有数据也不会停止,空轮询不但消耗消费者的CPU资源还会影响Redis性能。


         一个缓解的做法是让消费者线程设定一定的时间间隔 循环监控队列,虽然可行,但显然会造成不必要的资源浪费,而且循环间隔也难以确定。


   brpop 中的B 表示 “Block”, 是一个rpop命令的阻塞版本:


   若指定List没有新元素,在给定超时时间内,该命令会阻塞当前redis连接,直到超时返回nil,或者有另外一个客户端对给定key的任意一个执行LPUSH或RPUSH命令为止, 这样做的目的在于减小Redis压力。  


   对于Redis,阻塞读在队列没有数据会立即进入休眠状态,一旦数据到立即被唤醒,消息延迟几乎为0.


编程实践


本次使用 ASPNetCore 完成RedisMQ的实践,引入Redis国产第三方开源库CSRedisCore.


   不使用著名的StackExchange.Redis 组件库的原因:


       之前一直使用StackExchange.Redis, 参考了很多资料,做了很多优化,并未完全解决RedisTimeoutException问题


       StackExchange.Redis基于其多路复用的连接机制,不支持阻塞式命令, 故采用了 CSRedisCore,该库强调了API 与Redis官方命令一致,很容易上手


生产者Receiver


生产者使用LPush 命令向Redis List数据结构写入消息。

复制代码


------------------截取自Startup.cs-------------------------

public void ConfigureServices(IServiceCollection services)

{

// Redis客户端要定义成单例, 不然在大流量并发收数的时候, 会造成redis client来不及释放。另一方面也确认api控制器不是单例模式,

var csredis = new CSRedisClient(Configuration.GetConnectionString("redis")+",name=receiver");

RedisHelper.Initialization(csredis);

services.AddSingleton(csredis);


services.AddMvc();

}


------------------截取自数据接收Controller-------------------

[Route("batch")]

[HttpPost]

public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs)

{

if (!ModelState.IsValid)

throw new ArgumentException("Http Body Payload Error.");

var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}";

  eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs);

  if (eqidPairs != null && eqidPairs.Any())

RedisHelper.LPush(redisKey, eqidPairs.ToArray());

await Task.CompletedTask;

}


复制代码



消费者webapp


    根据以上RedisMQ思路,事件消费方式是拉取pull,故需要轮询Redis  List数据结构,这里使用ASPNetCore内置的BackgroundService后台服务类实现后台轮询消费任务。

复制代码


public class BackgroundJob : BackgroundService

   {

       private readonly IEqidPairHandler _eqidPairHandler;

       private readonly CSRedisClient[] _cSRedisClients;

       private readonly IConfiguration _conf;

       private readonly ILogger _logger;

       public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[] csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory)

       {

           _eqidPairHandler = eqidPairHandler;

           _cSRedisClients = csRedisClients;

           _conf = conf;

           _logger = loggerFactory.CreateLogger(nameof(BackgroundJob));

       }


       protected override async Task ExecuteAsync(CancellationToken stoppingToken)

       {

           _logger.LogInformation("Service starting");

           if (_cSRedisClients[0] == null)

           {

               _cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + ",defaultDatabase=" + 0);

           }

           RedisHelper.Initialization(_cSRedisClients[0]);


           while (!stoppingToken.IsCancellationRequested)

           {

               var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}";

               var eqidpair = RedisHelper.BRPop(5, key);     // 阻塞读,若指定List没有新元素,在给定超时时间内,该命令会阻塞当前redis连接,直到超时返回nil, 这样做的目的在于减小Redis压力。  

               if (eqidpair != null)

                   await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair));


               // 弱建议休眠一段时间,防止突发大流量导致webApp进程CPU满载,自行根据场景设置合理休眠时间, 不设置也是可以的

               await Task.Delay(5, stoppingToken);

           }

           _logger.LogInformation("Service stopping");

       }

   }


复制代码


最后依照引言中的部署原理图,将Nginx,Receiver, WebApp使用docker-compose工具容器化


根据docker-compsoe up命令的用法,若容器正在运行且对应的Service Configuration或Image并未改变,该容器不会被ReCreate;


docker-compose  up指令只会重建(Service或Image变更)的容器。


   If there are existing containers for a service, and the service’s configuration or image was changed after the container’s creation, docker-compose up picks up the changes by stopping and recreating the containers (preserving mounted volumes). To prevent Compose from picking up changes, use the --no-recreate flag.


做一次上线测试验证,修改docker-compose.yml文件Web app的容器服务,docker-compose up;


仅数据处理程序WebApp容器被重建:


Nice,分布式改造上线,效果很明显,现在可以放心安全的迭代Web App数据处理程序。