上一篇我们说到构建了一个Rabbitmq容器
现在我们说说如何在一个悄悄传输消息到队列
我们现在设计一个Rabbitmq发送消息部分的模块
先设计一个远程发送的接口
???public interface IRemoteSend ???{ ???????void Send<TEntity>(TEntity entity) where TEntity : class; ???}
写一个Rabbitmq配置实体
???public class RabbitmqConfigura ???{ ???????public string Host { get; set; } ???????public int Port { get; set; } ???????public string User { get; set; } ???????public string Password { get; set; } ???????public string VirtualHost { get; set; } ???}
写一个实现IRemoteSend的RabbitmqRemoteSend
???public class RabbitmqRemoteSend : IRemoteSend ???{ ???????private RabbitmqConfigura Configura { get; } ???????public RabbitmqRemoteSend(IOptions<RabbitmqConfigura> options) ???????{ ???????????Configura = options.Value; ???????} ???????public void Send<TEntity>(TEntity entity) where TEntity : class ???????{ ???????????throw new NotImplementedException(); ???????} ???}
我们再实现以下Send方法
???public class RabbitmqRemoteSend : IRemoteSend ???{ ???????public int DelaySend { get; set; } ???????private RabbitmqConfigura Configura { get; } ???????public RabbitmqRemoteSend(IOptions<RabbitmqConfigura> options) ???????{ ???????????Configura = options.Value; ???????} ???????public void Send<TEntity>(TEntity entity) where TEntity : class ???????{ ???????????var factory = new ConnectionFactory ???????????{ ???????????????HostName = Configura.Host, ???????????????Port = Configura.Port, ???????????????UserName = Configura.User, ???????????????Password = Configura.Password, ???????????????VirtualHost = Configura.VirtualHost, ???????????????AutomaticRecoveryEnabled = true, ???????????????NetworkRecoveryInterval = TimeSpan.FromSeconds(30) ???????????}; ???????????using (var connection = factory.CreateConnection()) ???????????{ ???????????????var model = connection.CreateModel(); ???????????????var type_name = typeof(TEntity); ???????????????var ExchangeName = type_name + ".exchange"; ???????????????var RouteKeyName = type_name + ".input"; ???????????????var QueueName = type_name + ".input"; ???????????????model.ConfirmSelect(); ???????????????model.ExchangeDeclare(ExchangeName, ExchangeType.Direct); ???????????????model.QueueDeclare(QueueName, false, false, false); ???????????????model.QueueBind(QueueName, ExchangeName, RouteKeyName); ???????????????var args = new Dictionary<string, object>(); ???????????????args.Add("x-message-ttl", DelaySend); ???????????????args.Add("x-dead-letter-exchange", ExchangeName); ???????????????args.Add("x-dead-letter-routing-key", QueueName); ???????????????model.QueueDeclare(QueueName + ".delay", false, false, false, args); ???????????????var bytes = new byte[]; ???????????????var props = model.CreateBasicProperties(); ???????????????props.ContentType = "text/plain"; ???????????????props.DeliveryMode = 2; ???????????????model.BasicPublish(ExchangeName, RouteKeyName, props, bytes); ???????????????model.WaitForConfirms(); ???????????} ???????} ???}
我们需要定义一个序列化的接口做数据编码
???public interface IFormattor ???{ ???????byte[] SerializeObject<TEntity>(TEntity entity) where TEntity : class; ???}
写一个默认实现
???public class JsonFormattor : IFormattor ???{ ???????public byte[] SerializeObject<TEntity>(TEntity entity) where TEntity : class ???????{ ???????????var jsonString = JsonConvert.SerializeObject(entity); ???????????return Encoding.UTF8.GetBytes(jsonString); ???????} ???}
再修改以下RabbitmqRemoteSend
public class RabbitmqRemoteSend : IRemoteSend ???{ ???????public int DelaySend { get; set; } ???????private RabbitmqConfigura Configura { get; } ???????private IFormattor Formattor { get; } ???????public RabbitmqRemoteSend(IOptions<RabbitmqConfigura> options, IFormattor formattor) ???????{ ???????????Configura = options.Value; ???????????Formattor = formattor; ???????} ???????public void Send<TEntity>(TEntity entity) where TEntity : class ???????{ ???????????var factory = new ConnectionFactory ???????????{ ???????????????HostName = Configura.Host, ???????????????Port = Configura.Port, ???????????????UserName = Configura.User, ???????????????Password = Configura.Password, ???????????????VirtualHost = Configura.VirtualHost, ???????????????AutomaticRecoveryEnabled = true, ???????????????NetworkRecoveryInterval = TimeSpan.FromSeconds(30) ???????????}; ???????????using (var connection = factory.CreateConnection()) ???????????{ ???????????????var model = connection.CreateModel(); ???????????????var type_name = typeof(TEntity); ???????????????var ExchangeName = type_name + ".exchange"; ???????????????var RouteKeyName = type_name + ".input"; ???????????????var QueueName = type_name + ".input"; ???????????????model.ConfirmSelect(); ???????????????model.ExchangeDeclare(ExchangeName, ExchangeType.Direct); ???????????????model.QueueDeclare(QueueName, false, false, false); ???????????????model.QueueBind(QueueName, ExchangeName, RouteKeyName); ???????????????var args = new Dictionary<string, object>(); ???????????????args.Add("x-message-ttl", DelaySend); ???????????????args.Add("x-dead-letter-exchange", ExchangeName); ???????????????args.Add("x-dead-letter-routing-key", QueueName); ???????????????model.QueueDeclare(QueueName + ".delay", false, false, false, args); ???????????????var bytes = Formattor.SerializeObject<TEntity>(entity); ???????????????var props = model.CreateBasicProperties(); ???????????????props.ContentType = "text/plain"; ???????????????props.DeliveryMode = 2; ???????????????model.BasicPublish(ExchangeName, RouteKeyName, props, bytes); ???????????????model.WaitForConfirms(); ???????????} ???????} ???}
我们加入asp.net core测试一下
在ConfigureServices内增加代码
???????????#region Rabbitmq ???????????services.Configure<RabbitmqConfigura>(p => ????????????{ ???????????????p.User = "admin"; ???????????????p.Password = "123456"; ???????????????p.Host = "127.0.0.1"; ???????????}); ???????????services.AddScoped<IFormattor, JsonFormattor>(); ???????????services.AddScoped<IRemoteSend, RabbitmqRemoteSend>(); ???????????#endregion
在Configure增加代码
???????????var scope = app.ApplicationServices.CreateScope(); ???????????var remoteSend = scope.ServiceProvider.GetRequiredService<IRemoteSend>(); ???????????remoteSend.Send(new User() ???????????{ ???????????????Name = "hello", ???????????????Account = "account" ???????????});
这段代码是完全用来测试的
User实体对象
???public class User ???{ ???????public string Name { get; set; } ???????public string Account { get; set; } ???}
我们在看看Rabbitmq内
看看内容
当asp.net core偶遇docker一(模型验证和Rabbitmq 二)
原文地址:https://www.cnblogs.com/NCoreCoder/p/9939594.html