分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 软件开发

RabbitMQ与.net core(二)Producer与Exchange

发布时间:2023-09-06 02:29责任编辑:傅花花关键词:暂无标签
原文:RabbitMQ与.net core(二)Producer与Exchange

Producer:消息的生产者,也就是创建消息的对象

Exchange:消息的接受者,也就是用来接收消息的对象,Exchange接收到消息后将消息按照规则发送到与他绑定的Queue中。下面我们来定义一个Producer与Exchange。

1.新建.netcore console项目,并引入RabbitMQ.Client的Nuget包

2.创建Exchange

using RabbitMQ.Client;namespace RabbitMQConsole{ ???class Program ???{ ???????static void Main(string[] args) ???????{ ???????????ConnectionFactory factory = new ConnectionFactory(); ???????????factory.HostName = "39.**.**.**"; ???????????factory.Port = 5672; ???????????factory.VirtualHost = "/"; ???????????factory.UserName = "root"; ???????????factory.Password = "root"; ???????????var exchange = "change2"; ???????????var route = "route2"; ???????????var queue = "queue2"; ???????????using (var connection = factory.CreateConnection()) ???????????{ ???????????????using (var channel = connection.CreateModel()) ???????????????{ ???????????????????channel.ExchangeDeclare(exchange, type:"direct", durable: true, autoDelete: false); ??//创建Exchange ???????????????????????????????????} ???????????} ???????} ???}}

可以看到Echange的参数有:

type:可选项为,fanout,direct,topic,headers。区别如下:

    fanout:发送到所有与当前Exchange绑定的Queue中

    direct:发送到与消息的routeKey相同的Rueue中

    topic:fanout的模糊版本

    headers:发送到与消息的header属性相同的Queue中

durable:持久化

autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。

 运行程序,可以在可视化界面看到change2

接下来我们可以创建与change2绑定的queue

3.创建Queue

 ???????????????using (var channel = connection.CreateModel()) ???????????????{ ???????????????????channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false); ???????????????????channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); ?#创建queue2 ???????????????????channel.QueueBind(queue, exchange, route);  #将queue2绑定到exchange2 ???????????????}

可以看到Echange的参数有:

durable:持久化

exclusive:如果为true,则queue只在channel存在时存在,channel关闭则queue消失

autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。

去可视化界面看Queue

4.发送消息

 ???????????????using (var channel = connection.CreateModel()) ???????????????{ ???????????????????channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false); ???????????????????channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); ???????????????????channel.QueueBind(queue, exchange, route); ???????????????????var props = channel.CreateBasicProperties(); ???????????????????props.Persistent = true; #持久化 ???????????????????channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit")); ???????????????}

5.消费消息

using RabbitMQ.Client;using System;using System.Text;namespace RabbitMQClient{ ???class Program ???{ ???????private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() ???????{ ???????????HostName = "39.**.**.**", ???????????Port = 5672, ???????????UserName = "root", ???????????Password = "root", ???????????VirtualHost = "/" ???????}; ???????static void Main(string[] args) ???????{ ???????????var exchange = "change2"; ???????????var route = "route2"; ???????????var queue = "queue2"; ???????????using (IConnection conn = rabbitMqFactory.CreateConnection()) ???????????using (IModel channel = conn.CreateModel()) ???????????{ ???????????????channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false); ???????????????channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); ???????????????channel.QueueBind(queue, exchange, route); ???????????????while (true) ???????????????{ ???????????????????var message = channel.BasicGet(queue, true); ?#第二个参数说明自动释放消息,如为false需手动释放消息 ???????????????????if(message!=null) ???????????????????{ ???????????????????????var msgBody = Encoding.UTF8.GetString(message.Body); ???????????????????????Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); ???????????????????} ???????????????????System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); ???????????????} ???????????} ???????} ???}}

运行查看结果

查看可视化界面

6.手动释放消息

 ???????????????while (true) ???????????????{ ???????????????????var message = channel.BasicGet(queue, false);#设置为手动释放 ???????????????????if(message!=null) ???????????????????{ ???????????????????????var msgBody = Encoding.UTF8.GetString(message.Body); ???????????????????????Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); ???????????????????} ???????????????????channel.BasicAck(message.DeliveryTag, false); #手动释放 ???????????????????System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); ???????????????}

我们再发一条消息,然后开始消费,加个断点调试一下

查看一下Queue中消息状态

然后直接取消调试,不让程序走到释放的那一步,再查看一下消息状态

这么说来只要不走到 channel.BasicAck(message.DeliveryTag, false);这一行,消息就不会被释放掉,我们让程序直接走到这一行代码,查看一下消息的状态

如图已经被释放了

7.让失败的消息回到队列中

 ???????????????while (true) ???????????????{ ???????????????????var message = channel.BasicGet(queue, false); ???????????????????if(message!=null) ???????????????????{ ???????????????????????var msgBody = Encoding.UTF8.GetString(message.Body); ???????????????????????Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); ???????????????????????Console.WriteLine(message.DeliveryTag);    #当前消息被处理的次序数 ???????????????????????if (1==1) ???????????????????????????channel.BasicReject(message.DeliveryTag, true); ???????????????????} ???????????????????????????????????????System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); ???????????????}

重新发送4条消息

开始消费

我们可以看到消息一直没有没消费,因为消息被处理之后又放到了队尾

8.监听消息

 using (IConnection conn = rabbitMqFactory.CreateConnection()) ???????????using (IModel channel = conn.CreateModel()) ???????????{ ???????????????channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false); ???????????????channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); ???????????????channel.QueueBind(queue, exchange, route); ???????????????channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false); ?#一次接受10条消息,否则rabbit会把所有的消息一次性推到client,会增大client的负荷 ???????????????EventingBasicConsumer consumer = new EventingBasicConsumer(channel); ???????????????consumer.Received += (model, ea) => ???????????????{ ???????????????????Byte[] body = ea.Body; ???????????????????String message = Encoding.UTF8.GetString(body); ???????????????????Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId); ???????????????????channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); ???????????????}; ???????????????channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer); ???????????????Console.ReadLine(); ???????????}

RabbitMQ与.net core(二)Producer与Exchange

原文地址:https://www.cnblogs.com/lonelyxmas/p/10237139.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved