分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 软件开发

RabbitMQ与.net core(四) 消息的优先级 与 死信队列

发布时间:2023-09-06 02:29责任编辑:林大明关键词:暂无标签

1.消息的优先级

假如现在有个需求,我们需要让一些优先级最高的通知推送到客户端,我们可以使用redis的sortedset,也可以使用我们今天要说的rabbit的消息优先级属性

Producer代码

using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole{ ???class Program ???{ ???????static void Main(string[] args) ???????{ ???????????ConnectionFactory factory = new ConnectionFactory(); ???????????factory.HostName = "39.**.**.**"; ???????????factory.Port = 5672; ???????????factory.VirtualHost = "/"; ???????????factory.UserName = "root"; ???????????factory.Password = "root"; ???????????var exchange = "change4"; ???????????var route = "route2"; ???????????var queue9 = "queue9"; ???????????using (var connection = factory.CreateConnection()) ???????????{ ???????????????using (var channel = connection.CreateModel()) ???????????????{ ???????????????????channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);            //x-max-priority属性必须设置,否则消息优先级不生效 ???????????????????channel.QueueDeclare(queue9, durable: true, exclusive: false, autoDelete: false,arguments: new Dictionary<string, object> { { "x-max-priority", 50 } }); ???????????????????channel.QueueBind(queue9, exchange, queue9); ???????????????????while(true) ???????????????????{ ???????????????????????var messagestr = Console.ReadLine(); ???????????????????????var messagepri = Console.ReadLine(); ???????????????????????var props = channel.CreateBasicProperties(); ???????????????????????props.Persistent = true; ???????????????????????props.Priority = (byte)int.Parse(messagepri);//设置消息优先级 ???????????????????????channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes(messagestr)); ???????????????????} ???????????????} ???????????} ???????} ???}}

consumer代码

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Collections.Generic;using System.Text;using System.Threading;namespace RabbitMQClient{ ???class Program ???{ ???????private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() ???????{ ???????????HostName = "39.**.**.**", ???????????Port = 5672, ???????????UserName = "root", ???????????Password = "root", ???????????VirtualHost = "/" ???????}; ???????static void Main(string[] args) ???????{ ???????????var exchange = "change4"; ???????????var route = "route2"; ???????????var queue9 = "queue9"; ???????????using (IConnection conn = rabbitMqFactory.CreateConnection()) ???????????using (IModel channel = conn.CreateModel()) ???????????{ ???????????????channel.ExchangeDeclare(exchange, "fanout", durable: true, autoDelete: false); ???????????????channel.QueueDeclare(queue9, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-max-priority", 50 } }); ???????????????channel.QueueBind(queue9, exchange, route); ???????????????channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false); ???????????????EventingBasicConsumer consumer = new EventingBasicConsumer(channel); ???????????????consumer.Received += (model, ea) => ???????????????{ ???????????????????Byte[] body = ea.Body; ???????????????????String message = Encoding.UTF8.GetString(body); ???????????????????Console.WriteLine( message); ???????????????????channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); ???????????????}; ???????????????channel.BasicConsume(queue: queue9, autoAck: false, consumer: consumer); ???????????????Console.ReadLine(); ???????????} ???????} ???}}

运行producer

在运行consumer

可以看出消息是按优先级消费的

2.死信队列

死信队列可以用来做容错机制,当我们的消息处理异常时我们可以把消息放入到死信队列中,以便后期处理,死信的产生有三种

1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue=false);

2.当前队列中的消息数量已经超过最大长度。

3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间;

看代码

using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole{ ???class Program ???{ ???????static void Main(string[] args) ???????{ ???????????ConnectionFactory factory = new ConnectionFactory(); ???????????factory.HostName = "39.**.**.**"; ???????????factory.Port = 5672; ???????????factory.VirtualHost = "/"; ???????????factory.UserName = "root"; ???????????factory.Password = "root"; ???????????var exchangeA = "changeA"; ???????????var routeA = "routeA"; ???????????var queueA = "queueA"; ???????????var exchangeD = "changeD"; ???????????var routeD = "routeD"; ???????????var queueD = "queueD"; ???????????using (var connection = factory.CreateConnection()) ???????????{ ???????????????using (var channel = connection.CreateModel()) ???????????????{ ???????????????????channel.ExchangeDeclare(exchangeD, type: "fanout", durable: true, autoDelete: false); ???????????????????channel.QueueDeclare(queueD, durable: true, exclusive: false, autoDelete: false); ???????????????????channel.QueueBind(queueD, exchangeD, routeD); ???????????????????channel.ExchangeDeclare(exchangeA, type: "fanout", durable: true, autoDelete: false); ???????????????????channel.QueueDeclare(queueA, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { ????????????????????????????????????????{ "x-dead-letter-exchange",exchangeD}, //设置当前队列的DLX ????????????????????????????????????????{ "x-dead-letter-routing-key",routeD}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 ????????????????????????????????????????{ "x-message-ttl",10000} //设置消息的存活时间,即过期时间 ????????????????????????????????????????}); ???????????????????channel.QueueBind(queueA, exchangeA, routeA); ???????????????????var properties = channel.CreateBasicProperties(); ???????????????????properties.Persistent = true; ???????????????????//发布消息 ???????????????????channel.BasicPublish(exchange: exchangeA, ????????????????????????????????????????routingKey: routeA, ????????????????????????????????????????basicProperties: properties, ????????????????????????????????????????body: Encoding.UTF8.GetBytes("message")); ???????????????} ???????????} ???????} ???}}

这样10秒后消息过期,我们可以看到queueD中有了消息


RabbitMQ与.net core(四) 消息的优先级 与 死信队列

原文地址:https://www.cnblogs.com/chenyishi/p/10242162.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved