C# 封装RabbitMQ消息队列处理
2021-01-23 13:14
标签:连接 abort art provider 两种 程序 int 发布 mitm 现在使用.net领域使用RabbitMQ有很多解决方案,我自己使用过的有两个,一个是EasyNetQ,一个是CAP,都挺好用的,尤其是CAP,懒人推荐使用,怎么使用的文章也很多,小伙伴可以自行搜索。 最近我自己尝试根据目前手头项目的需要,自行封装一下基于RabbitMQ的使用,下面开搞,贴上我自己写的代码。 首先定义消息发布者/生产者接口: 定义订阅者/消费者接口: 定义RabbmitMQProvider 实现生产者: 实现消费者: 到这里为止,简单的实现消息队列的接受,发送,已经满足我自己当前项目的需要了。这里我用的exchange进行消息队列的生产消费,并且用fanout模式,就是一个生产者对应多个消费者,有点类似于消息广播,另外还有两种模式,可以根据需要修改。 下面是测试代码: 运行效果如图所示: OK,没有问题。 另外注意,退出程序时消息发布者和订阅者都需要Dispose()来释放连接。 C# 封装RabbitMQ消息队列处理 标签:连接 abort art provider 两种 程序 int 发布 mitm 原文地址:https://www.cnblogs.com/my85016629/p/12072401.html 1 using System.Threading.Tasks;
2
3 namespace fx.MQ
4 {
5 public interface IPublisher
6 {
7 ///
1 using System;
2 using System.Threading.Tasks;
3
4 namespace fx.MQ
5 {
6 public interface ISubscriber
7 {
8 ///
1 using RabbitMQ.Client;
2 using System;
3 using System.Collections.Generic;
4 using System.Text;
5
6 namespace fx.MQ
7 {
8 public class RabbitMQProvider
9 {
10 private readonly string _ipAddress;
11 private readonly int? _port;
12 private readonly string _username;
13 private readonly string _password;
14
15 public RabbitMQProvider(string ipAddress, int? port, string username, string password)
16 {
17 _ipAddress = ipAddress ?? throw new ArgumentException("IP地址不能为空!");
18 _port = port ?? throw new ArgumentException("端口不能为空");
19 _username = username ?? throw new ArgumentException("用户名不能为空");
20 _password = password ?? throw new ArgumentException("密码不能为空");
21
22 ConnectionFactory = new ConnectionFactory//创建连接工厂对象
23 {
24 HostName = _ipAddress,//IP地址
25 Port = (int)_port,//端口号
26 UserName = _username,//用户账号
27 Password = _password//用户密码
28 };
29 }
30
31 public IConnectionFactory ConnectionFactory { get; }
32
33 }
34 }
1 using Newtonsoft.Json;
2 using RabbitMQ.Client;
3 using System;
4 using System.Text;
5 using System.Threading.Tasks;
6
7 namespace fx.MQ
8 {
9 ///
1 using RabbitMQ.Client;
2 using RabbitMQ.Client.Events;
3 using System;
4 using System.Collections.Generic;
5 using System.Text;
6 using System.Threading.Tasks;
7
8 namespace fx.MQ
9 {
10 ///
1 using System;
2 using System.Windows.Forms;
3
4 namespace fx.MQ.TestForm
5 {
6 public partial class Form1 : Form
7 {
8 private readonly RabbitMQProvider _provider;
9 private readonly RabbitMQPublisher _publisher;
10 private readonly RabbitMQSubscriber _subscriber;
11 delegate void Callback(string msg);
12
13 public Form1()
14 {
15 _provider = new RabbitMQProvider("192.168.101.199", 5672, "admin", "admin");
16 _publisher = new RabbitMQPublisher(_provider);
17 _subscriber = new RabbitMQSubscriber(_provider);
18 //callback = new Callback(ShowMessage);
19 InitializeComponent();
20 }
21
22 private void button1_Click(object sender, EventArgs e)
23 {
24 _publisher.Publish(textBox1.Text, "public");
25 }
26
27 private void Form1_Load(object sender, EventArgs e)
28 {
29
30 _subscriber.Subscribe("public", c=> {
31 ShowMessage(c);
32 });
33 }
34
35
36 private void ShowMessage(string msg)
37 {
38 if (this.richTextBox1.InvokeRequired)
39 {
40 var cb = new Callback(ShowMessage);
41 this.Invoke(cb, new object[] { msg });
42 }
43 else
44 {
45 this.richTextBox1.Text = msg;
46 }
47 }
48 }
49 }
下一篇:NodeJS-API