分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 技术分享

NetMQ 发布订阅模式 Publisher-Subscriber

发布时间:2023-09-06 01:17责任编辑:郭大石关键词:暂无标签

第一部分引用于:点击打开

1:简单介绍

PUB-SUB模式一般处理的都不是系统的关键数据。发布者不关注订阅者是否收到发布的消息,订阅者也不知道自己是否收到了发布者发出的所有消息。你也不知道订阅者何时开始收到消息。类似于广播,收音机。因此逻辑上,它都不是可靠的。这个可以通过与请求响应模型组合来解决。


图1:简单的发布订阅模式


图2:与请求响应模式组合的发布订阅模式

2:案例

定义IPublishser接口

namespace NetMQDemoPublisher{ ???public interface IPublisher:IDisposable ???{ ???????/// <summary> ???????/// 发布消息 ???????/// </summary> ???????/// <param name="topicName">主题</param> ???????/// <param name="data">内容</param> ???????void Publish(string topicName, string data); ???}}

Publisher实现类

namespace NetMQDemoPublisher{ ???public class Publisher:IPublisher ???{ ???????private object _lockObject = new object(); ???????private PublisherSocket _publisherSocket; ???????public Publisher(string endPoint) ???????{ ???????????_publisherSocket = new PublisherSocket(); ???????????_publisherSocket.Options.SendHighWatermark = 1000; ???????????_publisherSocket.Bind(endPoint); ???????} ???????#region Implementation of IDisposable ???????/// <summary> ???????/// 执行与释放或重置非托管资源相关的应用程序定义的任务。 ???????/// </summary> ???????public void Dispose() ???????{ ???????????lock (_lockObject) ???????????{ ???????????????_publisherSocket.Close(); ???????????????_publisherSocket.Dispose(); ???????????} ???????} ???????/// <summary> ???????/// 发布消息 ???????/// </summary> ???????/// <param name="topicName">主题</param> ???????/// <param name="data">内容</param> ???????public void Publish(string topicName, string data) ???????{ ???????????lock (_lockObject) ???????????{ ???????????????_publisherSocket.SendMoreFrame(topicName).SendFrame(data); ???????????} ???????} ???????#endregion ???}}

Publisher窗口界面

界面中实现的功能代码

namespace NetMQDemoPublisher{ ???public partial class PublisherForm : Form ???{ ???????private IPublisher publisher; ???????public PublisherForm() ???????{ ???????????InitializeComponent(); ???????????publisher = new Publisher("tcp://127.0.0.1:8888"); ???????} ???????private void button1_Click(object sender, EventArgs e) ???????{ ???????????string strContent = textBox1.Text; ???????????ListViewItem item = new ListViewItem(string.Format("topic:NetMQ,Data:{0}", ?strContent)); ???????????listView1.Items.Add(item); ???????????publisher.Publish("NetMQ", strContent); ???????} ???}}

定义ISubscriber接口

namespace NetMQDemoSubscriber{ ???public interface ISubscriber:IDisposable ???{ ???????/// <summary> ???????/// 事件 ???????/// </summary> ???????event Action<string, string> Nofity; ???????/// <summary> ???????/// 注册订阅主题 ???????/// </summary> ???????/// <param name="topics"></param> ???????void RegisterSubscriber(List<string> topics); ???????/// <summary> ???????/// 注册订阅 ???????/// </summary> ???????void RegisterSbuscriberAll(); ???????/// <summary> ???????/// 移除所有订阅消息,并关闭 ???????/// </summary> ???????void RemoveSbuscriberAll(); ???}}

Subscriber实现类

namespace NetMQDemoSubscriber{ ???public class Subscriber:ISubscriber ???{ ???????private SubscriberSocket _subscriberSocket = null; ???????private string _endpoint = @"tcp://127.0.0.1:9876"; ???????public Subscriber(string endPoint) ???????{ ???????????_subscriberSocket = new SubscriberSocket(); ???????????_endpoint = endPoint; ???????} ???????#region Implementation of IDisposable ???????/// <summary> ???????/// 执行与释放或重置非托管资源相关的应用程序定义的任务。 ???????/// </summary> ???????public void Dispose() ???????{ ???????????throw new NotImplementedException(); ???????} ???????#endregion ???????#region Implementation of ISubscriber ???????public event Action<string, string> Nofity = delegate { }; ???????/// <summary> ???????/// 注册订阅主题 ???????/// </summary> ???????/// <param name="topics"></param> ???????public void RegisterSubscriber(List<string> topics) ???????{ ???????????InnerRegisterSubscriber(topics); ???????} ???????/// <summary> ???????/// 注册订阅 ???????/// </summary> ???????public void RegisterSbuscriberAll() ???????{ ???????????InnerRegisterSubscriber(); ???????} ???????/// <summary> ???????/// 移除所有订阅消息,并关闭 ???????/// </summary> ???????public void RemoveSbuscriberAll() ???????{ ???????????InnerStop(); ???????} ???????#endregion ???????#region 内部实现 ???????/// <summary> ???????/// 注册订阅消息 ???????/// </summary> ???????/// <param name="topics">订阅的主题</param> ???????private void InnerRegisterSubscriber(List<string> topics = null) ???????{ ???????????InnerStop(); ???????????_subscriberSocket = new SubscriberSocket(); ???????????_subscriberSocket.Options.ReceiveHighWatermark = 1000; ???????????_subscriberSocket.Connect(_endpoint); ???????????if (null == topics) ???????????{ ???????????????_subscriberSocket.SubscribeToAnyTopic(); ???????????} ???????????else ???????????{ ???????????????topics.ForEach(item => _subscriberSocket.Subscribe(item)); ???????????} ???????????Task.Factory.StartNew(() => ???????????{ ???????????????while (true) ???????????????{ ???????????????????string messageTopicReceived = _subscriberSocket.ReceiveFrameString(); ???????????????????string messageReceived = _subscriberSocket.ReceiveFrameString(); ???????????????????Nofity(messageTopicReceived, messageReceived); ???????????????} ???????????}); ???????} ???????/// <summary> ???????/// 关闭订阅 ???????/// </summary> ???????private void InnerStop() ???????{ ???????????_subscriberSocket.Close(); ???????} ???????#endregion ???}}

Subscriber窗口界面

窗体功能代码

namespace NetMQDemoSubscriber{ ???public partial class SubscriberForm : Form ???{ ???????private ISubscriber subscriber; ???????public SubscriberForm() ???????{ ???????????InitializeComponent(); ???????} ???????private void SubscriberForm_Load(object sender, EventArgs e) ???????{ ???????????subscriber = new Subscriber("tcp://127.0.0.1:8888"); ???????????subscriber.RegisterSbuscriberAll(); ???????????subscriber.Nofity+= delegate(string s, string s1) ???????????{ ???????????????ListViewItem item = new ListViewItem(string.Format("topic:{0},Data:{1}", s, s1)); ???????????????listView1.Items.Add(item); ???????????}; ???????} ???}}

运行后,Publiser开启一个,Subscirber开启三个,进行测试如图

源码下载

如果觉得文章好,记得关注一下公众号哟!

NetMQ 发布订阅模式 Publisher-Subscriber

原文地址:http://www.cnblogs.com/mzy-google/p/7665656.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved