隐藏

Nodejs+Redis实现简易消息队列

发布:2023/11/1 23:12:02作者:管理员 来源:本站 浏览次数:203

前言


消息队列是存储数据的一个中间件,可以理解为一个容器。生产者生产消息投递 到队列中,消费者可以拉取消息进行消费,如果消费者目前没有消费的打算,则消息队列会保留消息,直到消费者有消费的打算。

在这里插入图片描述

设计思路

生产者


   连接 redis

   向指定通道 通过 lpush 消息


消费者


   连接 redis

   死循环通过 brpop 阻塞式获取消息

   拿到消息进行消费

   循环拿去下一个消息


Redis

安装及启动


   此步骤各位道友随意就好,不一定要用docker 。只要保证自己能连接到redis 服务即可。


# 使用docker 拉取redis 镜像

docker pull redis:latest


# 启动redis服务

# --name 后面是容器名字方便后续维护和管理

# -p 后面是指映射容器服务的 6379 端口到宿主机的 6379 端口

docker run -itd --name redis-mq -p 6379:6379 redis



# ============ docker 常用基本操作(题外话) =================


# 拉取镜像

docker pull 镜像名称


# 查看镜像

docker images


# 删除镜像

docker rmi 镜像名称


# 查看运行容器(仅为启动中的)

docker ps


# 查看运行容器(包含未启动)

docker ps -a


# 启动容器

docker start 容器名称/容器id


# 停止容器

docker stop 容器名称/容器id


Nodejs连接


初始化工程


# 创建文件夹并进入

mkdir queue-node-redis && cd queue-node-redis


# yarn 初始化

yarn init -y


# 下载redis包,

# 指定版本的原因是尽量减少道友们的失败几率 毕竟前端的工具迭代太快了

yarn add redis@4.2.0  


创建 lib 与 utils 目录


├── .gitignore

├── lib

├── package.json

├── utils

│   └── redis.js

└── yarn.lock


utils/redis.js


const redis = require("redis");


const redisCreateClient = async (config) => {

 try {

   const client = redis.createClient({

     url: `redis://${config.host}:${config.port}`,

   });

   await client.connect();

   await client.select(config.db);

   console.log("redis connect success");

   return client;

 } catch (err) {

   console.log("redis connect error");

   throw err;

 }

};


module.exports = {

 redisCreateClient,

};


index.js


   在项目根目录下创建此文件,测试redis连接是否成功


const { redisCreateClient } = require("./utils/redis");

const test = async () => {

 const client = await redisCreateClient({

   host: "127.0.0.1",

   port: 6379,

   db: 0,

 });

};

test();


出现如下图所示即可

在这里插入图片描述

minimist


轻量级的命令行参数解析引擎。


# 安装 minimist

yarn add minimist@1.2.6


使用方法


const systemArg = require("minimist")(process.argv.slice(2));

console.log(systemArg);


# 运行

node index.js --name test


# 输出

{ _: [], name: 'test' }


正文开始


   从目录结构及文件创建,手把手教程


在这里插入图片描述

目录结构变更


├── config.js       # 配置文件

├── lib

│   └── index.js # 主目录入口文件

├── package.json

├── utils                 # 工具函数库

│   └── redis.js

└── yarn.lock


config.js


   配置文件思路的重要性大于代码的实现


module.exports = {

 // redis 配置

 redis: {

   default: {

     host: "127.0.0.1",

     port: 6379,

     password: "",

     db: 0,

   },

 },

 // 消息队列频道设置

 mqList: [

   {

     // 消息频道名称

     name: "QUEUE_MY_MQ",

     // 阻塞式取值超时配置

     brPopTimeout: 100,

     // 开启任务数 此配置需要 PM 启动生效

     instances: 1,

     // redis 配置key

     redis: "default",

   },

 ],

};


参考 前端进阶面试题详细解答

lib/index.js


   针对配置做程序异常处理


const systemArg = require("minimist")(process.argv.slice(2));

const config = require("../config");

const { bootstrap } = require("./core");


// 程序自检


// 判断是否输入了 频道名称

if (!systemArg.name) {

 console.error("ERROR: channel name cannot be empty");

 process.exit(99);

}


// 频道队列配置

const mqConfig =

 config.mqList.find((item) => item.name === systemArg.name) ?? false;


// 如果config不存在

if (!mqConfig) {

 console.error("ERROR:  configuration not obtained");

 process.exit(99);

}


// redis 配置

const redisConfig = config.redis[mqConfig.redis];

if (!redisConfig) {

 console.error("ERROR: redis configuration not obtained");

 process.exit(99);

}


// node index.js --name QUEUE_MY_MQ

bootstrap(mqConfig, redisConfig);


lib/core.js


   后面的核心逻辑写在此处


async function bootstrap(config) {

 console.log(config);

}


module.exports = {

 bootstrap,

};


核心逻辑

lib/core.js


const { redisCreateClient } = require("../utils/redis");

async function bootstrap(mqConfig, redisConfig) {

 try {

   // 创建redis连接

   const client = await redisCreateClient(redisConfig);

   // 通过死循环阻塞程序

   while (true) {

     let res = null;

     console.log("队列执行");

     try {

       // 从队列中获取任务, 采用阻塞式获取任务 最大阻塞时间为config.queue.timeout

       res = await client.brPop(mqConfig.name, mqConfig.brPopTimeout);

       if (res === null) {

         continue;

       }

       console.log("TODO:: Task processing", res);

     } catch (error) {

       console.log("ERROR: redis brPop error", error);

       continue;

     }

   }

 } catch (err) {

   // 处理程序异常

   console.log("ERROR: ", err);

   process.exit(1);

 }

}

module.exports = {

 bootstrap,

};


生成测试数据


   为了接下来的测试,我们先生成一些测试数据


test/mockMq.js


const { redisCreateClient } = require("../utils/redis");

const config = require("../config");


/** 生成 1000 条测试消息 */

const mockMq = async (key) => {

 const client = await redisCreateClient(config.redis.default);

 for (let i = 0; i < 1000; i++) {

   // 向队列中 push 消息

   await client.lPush(key, "test" + i);

 }

 // 获取队列长度

 const count = await client.lLen(key);

 console.log(`生成1000条测试消息完成,目前共有${count}条消息`);

 // 关闭redis连接

 client.quit();

};


mockMq("QUEUE_MY_MQ");


验证脚本有效性


# 执行消息生成命令

node ./test/mockMq.js


# 程序输出

# redis connect success

# 生成 1000 条测试消息 完成,目前共有 1000 条消息


# 执行开启消费者

node ./lib/index.js --name QUEUE_MY_MQ

# TODO:: Task processing { key: 'QUEUE_MY_MQ', element: 'test0' }

# TODO:: Task processing .......

# TODO:: Task processing { key: 'QUEUE_MY_MQ', element: 'test999' }