分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 软件开发

【EasyNetQ 教程】- 订阅

发布时间:2023-09-06 02:08责任编辑:郭大石关键词:暂无标签

EasyNetQ订阅者订阅消息类型(消息类的.NET类型)。一旦通过调用Subscribe方法设置了对类型的订阅,就会在RabbitMQ代理上创建一个持久队列,并且该类型的任何消息都将被放置在队列中。只要连接,RabbitMQ就会将任何消息从队列发送给用户。

要订阅消息,我们需要为EasyNetQ提供在消息到达时执行的操作。我们通过传递订阅委托来做到这一点:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

现在每次发布MyMessage实例时,EasyNetQ都会调用我们的委托并将消息的Text属性打印到控制台。

您传递给Subscribe的订阅ID很重要。EasyNetQ将在RabbitMQ代理上为消息类型和订阅ID的每个唯一组合创建一个唯一的队列。

每次调用Subscribe都会创建一个新的队列使用者。如果使用相同的消息类型和订阅ID调用“订阅”两次,则将创建两个使用相同队列的消费者。然后RabbitMQ将依次循环连续消息给每个消费者。这非常适合扩展和工作共享。假设您已经创建了一个处理特定消息的服务,但它的工作负担过重。只需启动该服务的新实例(在同一台机器上,或在不同的机器上),无需配置任何内容,您就可以自动扩展。

如果使用不同的订阅ID(但具有相同的消息类型)两次调用“订阅”,则将创建两个队列,每个队列都有自己的使用者。给定类型的每条消息的副本将被路由到每个队列,因此每个消费者将获得所有消息(该类型的消息)。如果你有几个不同的服务都关心相同的消息类型,这是很好的。

编写订阅回调委托时的注意事项

当从通过EasyNetQ订阅的队列接收消息时,它们被放置在内存中队列中。单个线程位于一个循环中,从队列中获取消息并调用其Action代理。由于在单个线程上一次处理一个委托,因此应避免长时间运行的同步IO操作。尽快从代表处返回控制权。

使用SubscribeAsync

SubscribeAsync允许您的订阅者委托立即返回任务,然后异步执行长时间运行的IO操作。完成长期订阅后,只需完成任务即可。在下面的示例中,我们使用异步IO操作(DownloadStringTask)向Web服务发出请求。任务完成后,我们在控制台上写一行。

bus.SubscribeAsync<MyMessage>("subscribe_async_test", message => ????new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500")) ???????.ContinueWith(task => ????????????Console.WriteLine("Received: ‘{0}‘, Downloaded: ‘{1}‘", ????????????????message.Text, ????????????????task.Result)));

另一个示例将导致在出现错误时抛出异常,然后导致消息被置于缺省错误队列中:

_bus.SubscribeAsync<MessageType>("Queue_Identifier", ???????????message => Task.Factory.StartNew(() => ???????????{ ???????????????// Perform some actions here ???????????????// If there is a exception it will result in a task complete but task faulted which ???????????????// is dealt with below in the continuation ???????????}).ContinueWith(task => ???????????{ ???????????????if (task.IsCompleted && !task.IsFaulted) ???????????????{ ???????????????????// Everything worked out ok ???????????????} ???????????????else ???????????????{ ???????????????????????????????????????????// Dont catch this, it is caught further up the heirarchy and results in being sent to the default error queue ???????????????????// on the broker ???????????????????throw new EasyNetQException("Message processing exception - look in the default error queue (broker)"); ???????????????} ???????????}));

取消订阅

所有subscribe方法都返回一个ISubscriptionResult。它包含描述底层IConsumerIExchangeIQueue使用的属性,如果需要,可以使用高级API进一步操作这些属性。IAdvancedBus

您可以通过在ISubscriptionResult实例或其ConsumerCancellation属性上调用Dispose来随时取消订阅者:

var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);...subscriptionResult.Dispose();// this is equivalent to subscriptionResult.ConsumerCancellation.Dispose();

这将阻止EasyNetQ从队列中消耗并关闭消费者的频道。

请注意,处理IBusIAdvancedBus实例也将取消所有使用者并关闭与RabbitMQ的连接。

难道不叫subscriptionResult.Dispose()一个消息处理程序内。这将在EasyNetQ确认消费者频道上的消息subscriptionResult.Dispose()与关闭该频道的呼叫之间产生竞争条件。由于EasyNetQ的内部架构,这些调用将在不同的线程上调用,并且时序不确定。

【EasyNetQ 教程】- 订阅

原文地址:https://www.cnblogs.com/wangwust/p/9437361.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved