分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 代码编程

RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange

发布时间:2023-09-06 02:29责任编辑:白小东关键词:暂无标签

1.topic类型的Exchange

我们之前说过Topic类型的Exchange是direct类型的模糊查询模式,可以通过routkey来实现模糊消费message,topic的模糊匹配有两种模式:

1. 使用*来匹配一个单词

2.使用#来匹配0个或多个单词

我们来看代码

消费端

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Collections.Generic;using System.Text;using System.Threading;namespace RabbitMQClient{ ???class Program ???{ ???????private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() ???????{ ???????????HostName = "39.**.**.**", ???????????Port = 5672, ???????????UserName = "root", ???????????Password = "root", ???????????VirtualHost = "/" ???????}; ???????static void Main(string[] args) ???????{ ???????????var exchangeAll = "changeAll"; ???????????var queueman = "queueman"; ???????????var quemankey = "man.#"; ???????????using (IConnection conn = rabbitMqFactory.CreateConnection()) ???????????using (IModel channel = conn.CreateModel()) ???????????{ ???????????????channel.ExchangeDeclare(exchangeAll, type: "topic", durable: true, autoDelete: false); ???????????????channel.QueueDeclare(queueman, durable: true, exclusive: false, autoDelete: false); ???????????????channel.QueueBind(queueman, exchangeAll, quemankey); ???????????????channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false); ???????????????EventingBasicConsumer consumer = new EventingBasicConsumer(channel); ???????????????consumer.Received += (model, ea) => ???????????????{ ???????????????????Byte[] body = ea.Body; ???????????????????String message = Encoding.UTF8.GetString(body); ???????????????????Console.WriteLine( message); ???????????????????channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); ???????????????}; ???????????????channel.BasicConsume(queue: queueman, autoAck: false, consumer: consumer); ???????????????Console.ReadLine(); ???????????} ???????} ???}}

生产者代码

using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole{ ???class Program ???{ ???????static void Main(string[] args) ???????{ ???????????ConnectionFactory factory = new ConnectionFactory(); ???????????factory.HostName = "39.**.**.**"; ???????????factory.Port = 5672; ???????????factory.VirtualHost = "/"; ???????????factory.UserName = "root"; ???????????factory.Password = "root"; ???????????var exchangeAll = "changeAll"; ???????????//性别.姓氏.头发长度 ???????????var keymanA = "man.chen.long"; ???????????var keymanB = "man.liu.long"; ???????????var keymanC = "woman.liu.long"; ???????????var keymanD = "woman.chen.short"; ???????????using (var connection = factory.CreateConnection()) ???????????{ ???????????????using (var channel = connection.CreateModel()) ???????????????{ ???????????????????channel.ExchangeDeclare(exchangeAll, type: "topic", durable: true, autoDelete: false); ???????????????????var properties = channel.CreateBasicProperties(); ???????????????????properties.Persistent = true; ???????????????????//发布消息 ???????????????????channel.BasicPublish(exchange: exchangeAll, ???????????????????routingKey: keymanA, ???????????????????basicProperties: properties, ???????????????????body: Encoding.UTF8.GetBytes(keymanA)); ???????????????????channel.BasicPublish(exchange: exchangeAll, ????????????????????routingKey: keymanB, ????????????????????basicProperties: properties, ????????????????????body: Encoding.UTF8.GetBytes(keymanB)); ???????????????????channel.BasicPublish(exchange: exchangeAll, ????????????????????routingKey: keymanC, ????????????????????basicProperties: properties, ????????????????????body: Encoding.UTF8.GetBytes(keymanC)); ???????????????????channel.BasicPublish(exchange: exchangeAll, ????????????????????routingKey: keymanD, ????????????????????basicProperties: properties, ????????????????????body: Encoding.UTF8.GetBytes(keymanD)); ???????????????} ???????????} ???????} ???}}

我们先运行消费端再运行生产段,结果如下

消费端:

2.headers类型的exchange

生成者代码

using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole{ ???class Program ???{ ???????static void Main(string[] args) ???????{ ???????????ConnectionFactory factory = new ConnectionFactory(); ???????????factory.HostName = "39.**.**.**"; ???????????factory.Port = 5672; ???????????factory.VirtualHost = "/"; ???????????factory.UserName = "root"; ???????????factory.Password = "root"; ???????????var exchangeAll = "changeHeader"; ???????????using (var connection = factory.CreateConnection()) ???????????{ ???????????????using (var channel = connection.CreateModel()) ???????????????{ ???????????????????channel.ExchangeDeclare(exchangeAll, type: "headers", durable: true, autoDelete: false); ???????????????????var properties = channel.CreateBasicProperties(); ???????????????????properties.Persistent = true; ???????????????????properties.Headers = new Dictionary<string, object> { ???????????????????????{ "sex","man"} ???????????????????}; ???????????????????//发布消息 ???????????????????channel.BasicPublish(exchange: exchangeAll, ???????????????????routingKey: "", ???????????????????basicProperties: properties, ???????????????????body: Encoding.UTF8.GetBytes("hihihi")); ???????????????} ???????????} ???????} ???}}

消费端代码

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Collections.Generic;using System.Text;using System.Threading;namespace RabbitMQClient{ ???class Program ???{ ???????private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() ???????{ ???????????HostName = "39.**.**.**", ???????????Port = 5672, ???????????UserName = "root", ???????????Password = "root", ???????????VirtualHost = "/" ???????}; ???????static void Main(string[] args) ???????{ ???????????var exchangeAll = "changeHeader"; ???????????var queueman = "queueHeader"; ???????????using (IConnection conn = rabbitMqFactory.CreateConnection()) ???????????using (IModel channel = conn.CreateModel()) ???????????{ ???????????????channel.ExchangeDeclare(exchangeAll, type: "headers", durable: true, autoDelete: false); ???????????????channel.QueueDeclare(queueman, durable: true, exclusive: false, autoDelete: false); ???????????????channel.QueueBind(queueman, exchangeAll, "",new Dictionary<string, object> { { "sex","man" } }); ???????????????channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false); ???????????????EventingBasicConsumer consumer = new EventingBasicConsumer(channel); ???????????????consumer.Received += (model, ea) => ???????????????{ ???????????????????Byte[] body = ea.Body; ???????????????????String message = Encoding.UTF8.GetString(body); ???????????????????Console.WriteLine( message); ???????????????????channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); ???????????????}; ???????????????channel.BasicConsume(queue: queueman, autoAck: false, consumer: consumer); ???????????????Console.ReadLine(); ???????????} ???????} ???}}

RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange

原文地址:https://www.cnblogs.com/chenyishi/p/10242540.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved