用C#实现TCP连接池
标签:rlock ted cli 代码 大连 action == 过期 abc
背景
最近对接一个TCP协议,这个协议定义的非常好,有头标识和校验位!但是,接口提供方定的通信协议还是欠缺考虑...正常情况下,这个协议是没有问题的,但是在高并发的情况下,客户端方就需要点真功夫了。
分析
该通信协议中,没有使用事务号,也就是说,用同一条连接连续发送两次请求,你不知道返回的响应数据是哪个请求的。你可能会说,第一个响应是第一个请求的,第二个响应是第二个请求的!这是绝对的理想情况,服务器处理所有请求的耗时一样,网络没有抖动。如果耗时和网络抖动都无法确定的情况下,响应顺序与请求顺序就有可能不一致!结论就是,如果只有一个连接,那么所有请求只能排队,一个请求处理完才能发送下一个。那如果高并发请求比较多怎么办?
如何用没有事务号的通信协议实现高并发?Http?bingo!Http的请求方式就是解决这个问题的榜样!Http就是在一个连接中一次只发送一个请求,请求没有收到响应或者超时断开的情况下,是不会发送第二个请求的。那么Http是怎么并发的?通过同时发送多个Http请求来实现并发!以Http为榜样,那么就可以通过一个TCP连接一次发送一个请求,这个请求没有结束之前不再发送新的请求,如此来保证请求与响应的匹配,通过多个连接同时发送多个请求来实现高并发。
解决思路有了,还得考虑下性能。肯定不能每次发送请求就新建一个连接,请求结束就断开连接,这样很不TCP!那就考虑复用TCP连接,请求未结束之前,这个连接不可用,请求结束后不断开连接,可供新请求使用。并发量肯定有低峰和高峰,低峰的时候,不需要保留太多的连接;高峰的时候,如果不加以控制,TCP连接数量会飙升,也需要加以控制!
实现
基于以上分析,需要写一个TCP连接池,该连接池可配置最少、最多连接数量,以及连接可空闲时间。当需要发送请求时,如果没有可用连接,并且池内连接数量不超过最大数量,就创建新的连接;当池内数量达到上限,并且需要发送请求时,需要阻塞,直至有可用的连接;当连接空闲时间达到设置的可空闲时间,并且池内连接数量大于最小值时,清理掉多余的连接。
以下是连接池类,ConnectionInfo类和接口IConnection没有发,但是不影响看连接池代码。欢迎拍砖!
1 ///
2 /// 连接池
3 /// 对于不能单连并发的,用此连接池实现多连接并发,可控制最少最大连接数量
4 /// 连接保活
5 ///
6 public class ConnectionPool : IDisposable
7 {
8 private ConcurrentQueue m_IdleConnections;
9 private ConcurrentDictionarylong, ConnectionInfo> m_Connections;
10 private object m_ConnectionLock = new object();
11
12 private int m_Max;
13 private int m_Min;
14 private TimeSpan m_Expired;
15 private Func m_ConnectionFactory;
16
17 private bool m_IsConnected = false;
18 ///
19 /// 是否连接
20 ///
21 public bool IsConnected
22 {
23 get
24 {
25 return m_IsConnected;
26 }
27 set
28 {
29 if (m_IsConnected != value)
30 {
31 m_IsConnected = value;
32 if (m_IsConnected)
33 {
34 Connected?.Invoke(this, EventArgs.Empty);
35 }
36 else
37 {
38 Disconnected?.Invoke(this, EventArgs.Empty);
39 }
40 }
41 }
42 }
43
44 ///
45 /// 构造
46 ///
47 /// 最小连接数
48 /// 最大连接数
49 /// 过期时间 空闲时间超过此值则会被清理掉
50 /// 创建连接的回调
51 public ConnectionPool(int minConnection, int maxConnection, TimeSpan expired, Func connectionFactory)
52 {
53 m_Min = minConnection;
54 m_Max = maxConnection;
55 m_Expired = expired;
56 m_ConnectionFactory = connectionFactory;
57 m_Connections = new ConcurrentDictionarylong, ConnectionInfo>();
58 m_IdleConnections = new ConcurrentQueue();
59 StartClear();
60 }
61
62 private long m_No = 0;
63 private object m_NoLock = new object();
64
65 ///
66 /// 生成新的编号
67 ///
68 ///
69 private long NewNo()
70 {
71 long no;
72 lock (m_NoLock)
73 {
74 no = ++m_No;
75 }
76 return no;
77 }
78
79 ///
80 /// 抓取连接
81 ///
82 ///
83 private ConnectionInfo GrabConnection()
84 {
85 //判断空闲队列是否有
86 ConnectionInfo connectionInfo = null;
87 //开始抓取连接
88 Begin:
89 while (!m_IsDisposed && !m_IdleConnections.IsEmpty)
90 {
91 if (m_IdleConnections.TryDequeue(out connectionInfo))
92 {
93 if (m_Connections.ContainsKey(connectionInfo.No))
94 {//取到的连接没有被销毁
95 break;
96 }
97 else
98 {//不可用则销毁此连接,继续寻找
99 DestoryConnection(connectionInfo);
100 connectionInfo = null;
101 }
102 }
103 else
104 {
105 connectionInfo = null;
106 }
107 }
108 if (!m_IsDisposed && connectionInfo == null)
109 {//没有取到连接
110 if (!CreateOrAddConnection(null))
111 {//创建连接失败,睡眠10ms
112 Thread.Sleep(10);
113 }
114 //继续抓取连接
115 goto Begin;
116 }
117 return connectionInfo;
118 }
119
120 ///
121 /// 创建或者添加连接
122 ///
123 public bool CreateOrAddConnection(IConnection connection)
124 {
125 bool rst = false;
126 if (m_Connections.Count m_Max)
127 {
128 lock (m_ConnectionLock)
129 {
130 if (m_Connections.Count m_Max)
131 {
132 if (connection == null)
133 {
134 connection = m_ConnectionFactory.Invoke();
135 }
136 var conInfo = new ConnectionInfo()
137 {
138 No = NewNo(),
139 Connection = connection,
140 CreateTime = DateTime.Now
141 };
142 if (m_Connections.TryAdd(conInfo.No, conInfo))
143 {
144 connection.Connect();
145 IsConnected = connection.IsConnected;
146 m_IdleConnections.Enqueue(conInfo);
147 rst = true;
148 OutputDebugInfo(string.Format("创建{0}", conInfo.No));
149 }
150 }
151 }
152 }
153 return rst;
154 }
155
156 ///
157 /// 销毁连接
158 ///
159 ///
160 private void DestoryConnection(ConnectionInfo connectionInfo)
161 {
162 ConnectionInfo temp;
163 while (m_Connections.ContainsKey(connectionInfo.No))
164 {
165 if (m_Connections.TryRemove(connectionInfo.No, out temp))
166 {
167 break;
168 }
169 else
170 {
171 Thread.Sleep(10);
172 }
173 }
174 try
175 {
176 connectionInfo.Connection.Disconnect();
177 }
178 catch (Exception ex)
179 {
180 OutputDebugInfo(string.Format("断开连接失败:{0}", ex.ToString()));
181 }
182 OutputDebugInfo(string.Format("销毁{0}", connectionInfo.No));
183 }
184
185 ///
186 /// 发送
187 ///
188 ///
189 ///
190 ///
191 ///
192 public void Send(byte[] rstData, Actionstring, byte[]> callback)
193 {
194 var connInfo = GrabConnection();
195 if (connInfo == null)
196 {
197 OutputDebugInfo("未获取到可用连接");
198 callback?.Invoke("无可用连接", null);
199 return;
200 }
201 OutputDebugInfo(string.Format("获取到连接{0}", connInfo.No));
202 if (connInfo.Connection.IsConnected)
203 {
204 connInfo.LastUsedTime = DateTime.Now;
205 try
206 {
207 connInfo.Connection.Send(rstData,
208 (error, rndData) =>
209 {
210 callback?.BeginInvoke(error, rndData, null, null);
211 //重新加入空闲队列
212 if (string.IsNullOrEmpty(error) && m_Connections.ContainsKey(connInfo.No))
213 {
214 m_IdleConnections.Enqueue(connInfo);
215 }
216 });
217 }
218 catch (Exception ex)
219 {
220 OutputDebugInfo(string.Format("发送失败:{0}", ex.ToString()));
221 }
222 }
223 else
224 {
225 callback?.BeginInvoke("断开连接", null, null, null);
226 DestoryConnection(connInfo);
227 }
228 }
229
230 ///
231 /// 是否已清理
232 ///
233 private bool m_IsDisposed = false;
234 ///
235 /// 清理
236 ///
237 public void Dispose()
238 {
239 if (m_IsDisposed)
240 {
241 return;
242 }
243 m_IsDisposed = true;
244
245 Clear(true);
246 OutputDebugInfo("释放完成");
247 }
248
249 private object m_ClearLock = new object();
250
251 ///
252 /// 开始清理
253 ///
254 private void StartClear()
255 {
256 ThreadPool.QueueUserWorkItem(
257 (obj) =>
258 {
259 Clear(false);
260 Thread.Sleep(1000);
261 if (!m_IsDisposed)
262 {
263 StartClear();
264 }
265 });
266 }
267
268 ///
269 /// 清理
270 ///
271 private void Clear(bool isForece)
272 {
273 lock (m_ClearLock)
274 {
275 var nos = m_Connections.Keys.ToList();
276 ConnectionInfo connInfo = null;
277 foreach (var no in nos)
278 {
279 if (m_Connections.TryGetValue(no, out connInfo))
280 {
281 try
282 {
283 connInfo.Connection.Send(new byte[0], null);
284 IsConnected = true;
285 }
286 catch
287 {
288 IsConnected = false;
289 }
290 if ((m_Connections.Count > m_Min && DateTime.Now - connInfo.LastUsedTime > m_Expired) || isForece)
291 {
292 DestoryConnection(connInfo);
293 }
294 }
295 }
296 }
297 }
298
299 ///
300 ///
301 ///
302 ///
303 private void OutputDebugInfo(string debugInfo)
304 {
305 System.Diagnostics.Debug.WriteLine(string.Format("{0}-ConnectionPool-{1}", m_ConnectionFactory == null ? "Server" : "Client", debugInfo));
306 }
307
308 public event EventHandler Connected;
309 public event EventHandler Disconnected;
310 }
用C#实现TCP连接池
标签:rlock ted cli 代码 大连 action == 过期 abc
原文地址:https://www.cnblogs.com/yp-maybe/p/12309705.html
评论