C#Nanomsg订阅发布

2021-01-20 07:12

阅读:504

标签:负载   nts   message   als   await   请求   服务集   get   one   

Nanomsg简介

nanomsg是一个socket library,提供了几种常见的通讯模式,目前可用可扩展的协议有:

PAIR 一对一

BUS 多对多

REQREP 允许构建无状态服务集群来处理用户请求

PUBSUB 将消息发给订阅消息的用户

PIPELINE 汇总来自多个来源的消息,并在目的点之间负载均衡。

SURVEY 允许一次查询多个应用程序的状态

Nanomsg订阅发布使用示例

C#中使用Nanomsg非常方便,只需要NuGet里面查找Nanomsg,选择合适版本安装即可,这里选择的是NNanomsg,

版本是0.5.2.使用起来非常简单

需要server发布信息,client订阅信息。

nanomsg使用进程间通信,server端只需要bind某个地址即可,然后通过PublishSocket send数据就可以了,

而client端通过SubscribeSocket订阅一个或者多个topic,然后connect通讯地址,通过Receive接收数据即可。

Nanomsg是将topic和消息连接在一起,只需要client订阅的topic和整个消息的前面部分一致就可以收到数据。

实际代码如下:

server端:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;
using NNanomsg.Protocols;

namespace WPFNanoMsgServer
{
    /// 
    /// MainWindow.xaml 的交互逻辑
    /// 
    public partial class MainWindow : Window
    {
        PublishSocket serversocket = new PublishSocket();
        string address;

        public MainWindow()
        {
            InitializeComponent();
        }

        private void button_Click(object sender, RoutedEventArgs e)
        {
            //获取参数并启动
            serversocket.Bind(textBox_address.Text);
        }

        private void button1_Click(object sender, RoutedEventArgs e)
        {
            string topic = textBox_topic.Text;
            string content = textBox_content.Text;
            string message = topic + "|" + content;
            byte[] b = Encoding.UTF8.GetBytes(message);
            serversocket.Send(b);
            if(textBlock_log.Text.Count()>5000)
            {
                textBlock_log.Text = "";
            }
            textBlock_log.Text ="publish msg:"+ message + "\n" + textBlock_log.Text;
        }
    }
}

对应的UI xaml 内容如下:


        

        
        

client端代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;
using System.Windows.Threading;
using NNanomsg.Protocols;

namespace WPFNanoMsgClient
{
    /// 
    /// MainWindow.xaml 的交互逻辑
    /// 
    public partial class MainWindow : Window
    {
        DispatcherTimer timer;
        static string log = "";
        void UpdateLog(object sender, EventArgs e)
        {
            textBlock_log.Text = log;
        }
        void StartTimer()
        {
            timer = new DispatcherTimer();
            timer.Interval = TimeSpan.FromMilliseconds(200);
            timer.Tick += UpdateLog;
            timer.Start();
        }
        public MainWindow()
        {
            InitializeComponent();
            StartTimer();
        }

        bool runFlag = false;
        static string topicstr;
        static string serveraddress;

        public static void DealReceive()
        {
           
        }
        static  void DealReceivedata()
        {
            //await Task.Run({
            //});
            using (var s = new SubscribeSocket())
            {
                //Needs to match the first portion of the message being received.

                //需要订阅多个数据
                //string topicstr = textBox_topic.Text;
                string[] topics = topicstr.Split('#');
                foreach (var singletopin in topics)
                {
                    s.Subscribe(singletopin);
                }


                s.Connect(serveraddress);
                while (true)
                {
                    byte[] b = s.Receive();
                    if (b != null)
                    {
                        Console.WriteLine("Received: " + Encoding.ASCII.GetString(b));
                        if (log.Count() > 500)
                        {
                            log = "";
                        }
                        else
                        {
                            log = "received:" + Encoding.UTF8.GetString(b) + "\n" + log;
                        }
                    }
                    else
                    {
                        Console.WriteLine("x");
                    }
                }
            }
        }

        Thread th = new Thread(DealReceivedata);
        private void button_Click(object sender, RoutedEventArgs e)
        {
            topicstr = textBox_topic.Text;
            serveraddress = textBox_address.Text;
            if(runFlag)
            {
                th.Abort();                
            }
            else
            {
                th.Start();
            }
            runFlag = !runFlag;
                 
        }
    }
}

UI xaml:



    

参考资料

https://blog.csdn.net/u012635648/article/details/78665680

C#Nanomsg订阅发布

标签:负载   nts   message   als   await   请求   服务集   get   one   

原文地址:https://www.cnblogs.com/youdias/p/12141728.html


评论


亲,登录后才可以留言!