1.topic类型的Exchange
我们之前说过Topic类型的Exchange是direct类型的模糊查询模式,可以通过routkey来实现模糊消费message,topic的模糊匹配有两种模式:
1. 使用*来匹配一个单词
2.使用#来匹配0个或多个单词
我们来看代码
消费端
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Collections.Generic;using System.Text;using System.Threading;namespace RabbitMQClient{ ???class Program ???{ ???????private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() ???????{ ???????????HostName = "39.**.**.**", ???????????Port = 5672, ???????????UserName = "root", ???????????Password = "root", ???????????VirtualHost = "/" ???????}; ???????static void Main(string[] args) ???????{ ???????????var exchangeAll = "changeAll"; ???????????var queueman = "queueman"; ???????????var quemankey = "man.#"; ???????????using (IConnection conn = rabbitMqFactory.CreateConnection()) ???????????using (IModel channel = conn.CreateModel()) ???????????{ ???????????????channel.ExchangeDeclare(exchangeAll, type: "topic", durable: true, autoDelete: false); ???????????????channel.QueueDeclare(queueman, durable: true, exclusive: false, autoDelete: false); ???????????????channel.QueueBind(queueman, exchangeAll, quemankey); ???????????????channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false); ???????????????EventingBasicConsumer consumer = new EventingBasicConsumer(channel); ???????????????consumer.Received += (model, ea) => ???????????????{ ???????????????????Byte[] body = ea.Body; ???????????????????String message = Encoding.UTF8.GetString(body); ???????????????????Console.WriteLine( message); ???????????????????channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); ???????????????}; ???????????????channel.BasicConsume(queue: queueman, autoAck: false, consumer: consumer); ???????????????Console.ReadLine(); ???????????} ???????} ???}}
生产者代码
using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole{ ???class Program ???{ ???????static void Main(string[] args) ???????{ ???????????ConnectionFactory factory = new ConnectionFactory(); ???????????factory.HostName = "39.**.**.**"; ???????????factory.Port = 5672; ???????????factory.VirtualHost = "/"; ???????????factory.UserName = "root"; ???????????factory.Password = "root"; ???????????var exchangeAll = "changeAll"; ???????????//性别.姓氏.头发长度 ???????????var keymanA = "man.chen.long"; ???????????var keymanB = "man.liu.long"; ???????????var keymanC = "woman.liu.long"; ???????????var keymanD = "woman.chen.short"; ???????????using (var connection = factory.CreateConnection()) ???????????{ ???????????????using (var channel = connection.CreateModel()) ???????????????{ ???????????????????channel.ExchangeDeclare(exchangeAll, type: "topic", durable: true, autoDelete: false); ???????????????????var properties = channel.CreateBasicProperties(); ???????????????????properties.Persistent = true; ???????????????????//发布消息 ???????????????????channel.BasicPublish(exchange: exchangeAll, ???????????????????routingKey: keymanA, ???????????????????basicProperties: properties, ???????????????????body: Encoding.UTF8.GetBytes(keymanA)); ???????????????????channel.BasicPublish(exchange: exchangeAll, ????????????????????routingKey: keymanB, ????????????????????basicProperties: properties, ????????????????????body: Encoding.UTF8.GetBytes(keymanB)); ???????????????????channel.BasicPublish(exchange: exchangeAll, ????????????????????routingKey: keymanC, ????????????????????basicProperties: properties, ????????????????????body: Encoding.UTF8.GetBytes(keymanC)); ???????????????????channel.BasicPublish(exchange: exchangeAll, ????????????????????routingKey: keymanD, ????????????????????basicProperties: properties, ????????????????????body: Encoding.UTF8.GetBytes(keymanD)); ???????????????} ???????????} ???????} ???}}
我们先运行消费端再运行生产段,结果如下
消费端:
2.headers类型的exchange
生成者代码
using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole{ ???class Program ???{ ???????static void Main(string[] args) ???????{ ???????????ConnectionFactory factory = new ConnectionFactory(); ???????????factory.HostName = "39.**.**.**"; ???????????factory.Port = 5672; ???????????factory.VirtualHost = "/"; ???????????factory.UserName = "root"; ???????????factory.Password = "root"; ???????????var exchangeAll = "changeHeader"; ???????????using (var connection = factory.CreateConnection()) ???????????{ ???????????????using (var channel = connection.CreateModel()) ???????????????{ ???????????????????channel.ExchangeDeclare(exchangeAll, type: "headers", durable: true, autoDelete: false); ???????????????????var properties = channel.CreateBasicProperties(); ???????????????????properties.Persistent = true; ???????????????????properties.Headers = new Dictionary<string, object> { ???????????????????????{ "sex","man"} ???????????????????}; ???????????????????//发布消息 ???????????????????channel.BasicPublish(exchange: exchangeAll, ???????????????????routingKey: "", ???????????????????basicProperties: properties, ???????????????????body: Encoding.UTF8.GetBytes("hihihi")); ???????????????} ???????????} ???????} ???}}
消费端代码
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Collections.Generic;using System.Text;using System.Threading;namespace RabbitMQClient{ ???class Program ???{ ???????private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() ???????{ ???????????HostName = "39.**.**.**", ???????????Port = 5672, ???????????UserName = "root", ???????????Password = "root", ???????????VirtualHost = "/" ???????}; ???????static void Main(string[] args) ???????{ ???????????var exchangeAll = "changeHeader"; ???????????var queueman = "queueHeader"; ???????????using (IConnection conn = rabbitMqFactory.CreateConnection()) ???????????using (IModel channel = conn.CreateModel()) ???????????{ ???????????????channel.ExchangeDeclare(exchangeAll, type: "headers", durable: true, autoDelete: false); ???????????????channel.QueueDeclare(queueman, durable: true, exclusive: false, autoDelete: false); ???????????????channel.QueueBind(queueman, exchangeAll, "",new Dictionary<string, object> { { "sex","man" } }); ???????????????channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false); ???????????????EventingBasicConsumer consumer = new EventingBasicConsumer(channel); ???????????????consumer.Received += (model, ea) => ???????????????{ ???????????????????Byte[] body = ea.Body; ???????????????????String message = Encoding.UTF8.GetString(body); ???????????????????Console.WriteLine( message); ???????????????????channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); ???????????????}; ???????????????channel.BasicConsume(queue: queueman, autoAck: false, consumer: consumer); ???????????????Console.ReadLine(); ???????????} ???????} ???}}
RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange
原文地址:https://www.cnblogs.com/chenyishi/p/10242540.html