分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 教程案例

架构设计之NodeJS操作消息队列RabbitMQ

发布时间:2023-09-06 02:04责任编辑:沈小雨关键词:消息队列Node

一. 什么是消息队列?

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

二. 常用的消息队列有哪些?

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。

甚至现在部分NoSQL也可做消息队列,如Redis。

三. 消息队列的使用场景?

  • 异步处理

  • 应用解耦

  • 流量削峰

四. 使用案例

上规模的公司都会有自己的日志分析系统,日志系统是怎么实现的呢?

图解:用户在访问应用的时候,我们要记录下用户的操作记录和系统的异常日志,常规的做法是将系统产生的日志保存到服务器磁盘,在服务器中开启定时任务,定时将磁盘的日志信息传入mq中(生产者),也定时将mq中的消息取出并存到相应的数据库,如ElasticSearch或Hive中。

五. 如何安装RabbitMQ?

上面的案例介绍了MQ的一个使用场景,我这里是用RabbitMQ举例,现实项目中可能用到的是Kafka。

  1. 首先安装brew(mac为例)

    /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" 
  2. 安装RabbitMQ

    brew install rabbitmq
  3. 运行RabbitMQ

    进入到 /usr/local/Cellar/rabbitmq/3.7.7,执行

    sbin/rabbitmq-server
  4. 启动插件

    进入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin

    ./rabbitmq-plugins enable rabbitmq_management
  5. 登陆管理界面

    打开浏览器输入:http://localhost:15672,RabbitMQ默认15672端口六. Nodejs操作RabbitMQ

     

网上可以找到好几个相应的Node SDK,这里推荐amqplib

1. 生产者

/** * 对RabbitMQ的封装 */let amqp = require(‘amqplib‘);class RabbitMQ { ???constructor() { ???????this.hosts = []; ???????this.index = 0; ???????this.length = this.hosts.length; ???????this.open = amqp.connect(this.hosts[this.index]); ???} ???sendQueueMsg(queueName, msg, errCallBack) { ???????let self = this; ???????self.open ???????????.then(function (conn) { ???????????????return conn.createChannel(); ???????????}) ???????????.then(function (channel) { ???????????????return channel.assertQueue(queueName).then(function (ok) { ???????????????????return channel.sendToQueue(queueName, new Buffer(msg), { ???????????????????????persistent: true ???????????????????}); ???????????????}) ???????????????????.then(function (data) { ???????????????????????if (data) { ???????????????????????????errCallBack && errCallBack("success"); ???????????????????????????channel.close(); ???????????????????????} ???????????????????}) ???????????????????.catch(function () { ???????????????????????setTimeout(() => { ???????????????????????????if (channel) { ???????????????????????????????channel.close(); ???????????????????????????} ???????????????????????}, 500) ???????????????????}); ???????????}) ???????????.catch(function () { ???????????????let num = self.index++; ???????????????if (num <= self.length - 1) { ???????????????????self.open = amqp.connect(self.hosts[num]); ???????????????} else { ???????????????????self.index == 0; ???????????????} ???????????}); ???}}

2. 消费者

/** * 对RabbitMQ的封装 */let amqp = require(‘amqplib‘);class RabbitMQ { ???constructor() { ???????this.open = amqp.connect(this.hosts[this.index]); ???} ???receiveQueueMsg(queueName, receiveCallBack, errCallBack) { ???????let self = this; ???????self.open ???????????.then(function (conn) { ???????????????return conn.createChannel(); ???????????}) ???????????.then(function (channel) { ???????????????return channel.assertQueue(queueName) ???????????????????.then(function (ok) { ???????????????????????return channel.consume(queueName, function (msg) { ???????????????????????????if (msg !== null) { ???????????????????????????????let data = msg.content.toString(); ???????????????????????????????channel.ack(msg); ???????????????????????????????receiveCallBack && receiveCallBack(data); ???????????????????????????} ???????????????????????}) ???????????????????????????.finally(function () { ???????????????????????????????setTimeout(() => { ???????????????????????????????????if (channel) { ???????????????????????????????????????channel.close(); ???????????????????????????????????} ???????????????????????????????}, 500) ???????????????????????????}); ???????????????????}) ???????????}) ???????????.catch(function () { ???????????????let num = self.index++; ???????????????if (num <= self.length - 1) { ???????????????????self.open = amqp.connect(self.hosts[num]); ???????????????} else { ???????????????????self.index = 0; ???????????????????self.open = amqp.connect(self.hosts[0]); ???????????????} ???????????}); ???}

3. 通过生产者向MQ发送一个消息,并创建队列

let mq = new RabbitMQ();mq.sendQueueMsg(‘testQueue‘, ‘my first message‘, (error) => { ???console.log(error)})

执行之后,我们打开管理平台,发现RabbbitMQ已经接受到了一条消息:

并且RabbbitMQ新增了一个队列testQueue

4. 获取指定队列的消息

let mq = new RabbitMQ();mq.receiveQueueMsg(‘testQueue‘,(msg) => { ??????console.log(msg)})// 输出结果:my first message复制代码

此时打开RabbitMQ管理平台,消息数量已经变为0

综上:我们简单讲述了消息队列及RabbitMQ相关的一些知识,以及我们如何通过nodejs来生产与消费消息,上面讲的比较简单,之后会发表更多文章讲述消息队列集群搭建及容灾的实现。

架构设计之NodeJS操作消息队列RabbitMQ

原文地址:https://www.cnblogs.com/wukong-holmes/p/9306733.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved