科技行者

行者学院 转型私董会 科技行者专题报道 网红大战科技行者

知识库

知识库 安全导航

至顶网软件频道基础软件.NET下可复用的TCP通信层实现之TCP组件

.NET下可复用的TCP通信层实现之TCP组件

  • 扫一扫
    分享文章到微信

  • 扫一扫
    关注官方公众号
    至顶头条

以前就很想将自己在Tcp通信层的开发心得、经验共享出来,但一直没有实现,究其原因,还是自己太懒了

作者:zhuweisky 来源:博客园 2007年11月4日

关键字:

  • 评论
  • 分享微博
  • 分享邮件
5.异步Tcp组件实现

  这种方式的主要思想是:当一个新的Tcp连接建立时,就在该连接上发送一个异步接收的请求(BeginRead),并在异步回调中处理该请求,当请求处理完毕,再次发送异步接收请求,如此循环下去。异步接收启用的是系统默认线程池中的线程,所以,在异步Tcp组件中不用显式管理工作线程。异步Tcp组件的实现相对于完成端口模型而言简单许多,也单纯一些,不用管理请求队列,不需使用工作者线程等等。但是,相比于完成端口模型,其也有明显的缺陷:一个Tcp连接绑定到了一个线程,即使这个线程是后台线程池中的。如果用户数量巨大,这对性能是极其不利的;而完成端口模型,则可以限定工作者线程的个数,并且可以根据应用的类型进行灵活调节。

  异步Tcp组件实现源码。

  异步Tcp组件

1/**//// <summary>
2 /// AsynTcp 异步Tcp组件。
3 /// </summary>
4 public class AsynTcp :ITcp
5 {
6 members#region members
7 private const int BufferSize = 1024 ;
8
9 private IXTcpListener xtcpListener = null ;
10 private ITcpReqStreamDispatcher messageDispatcher = null ;
11 private ContextKeyManager contextKeyMgr = new ContextKeyManager() ;
12 private bool stateIsStop = true ;
13 private bool validateRequest = false ;
14 private int curPort = 8888 ;
15 #endregion
16
17
18 public AsynTcp()
19 {
20
21 }
22
23 INet 成员#region INet 成员
24
25 public event CallBackDynamicMessage DynamicMsgArrived;
26
27 public NetAddinType GetProtocalType()
28 {
29
30 return NetAddinType.Tcp;
31 }
32
33 InitializeAll ,UnitializeAll#region InitializeAll ,UnitializeAll
34 public void InitializeAll(IReqestStreamDispatcher i_dispatcher, int port, bool userValidated)
35 {
36 this.messageDispatcher = i_dispatcher as ITcpReqStreamDispatcher;
37 if(this.messageDispatcher == null)
38 {
39 throw new Exception("Can't convert IReqestStreamDispatcher to ITcpReqStreamDispatcher in CompletePortManager.InitializeAll method ! ") ;
40 }
41
42 this.curPort = port ;
43 this.validateRequest = userValidated ;
44
45 this.InitializeAll() ;
46 }
47
48 public void InitializeAll()
49 {
50 this.xtcpListener = new XTcpListener(this.curPort) ;
51 this.xtcpListener.TcpConnectionEstablished += new CBackUserLogon(xtcpListener_TcpConnectionEstablished);
52 this.xtcpListener.DynamicMsgArrived += new CallBackDynamicMsg(this.PutoutDynamicMsg) ;
53 this.contextKeyMgr.StreamCountChanged += new CallBackCountChanged(this.OnStreamCountChanged) ;
54 }
55
56 public void UnitializeAll()
57 {
58 this.Stop() ;
59 this.xtcpListener.ExitListenThread() ;
60
61 //将事件容器清空==》防止外部框架再多次初始化的过程中将一个事件预定多次
62 this.ConnectionCountChanged = null ;
63 this.DynamicMsgArrived = null ;
64 this.ServiceCommitted = null ;
65 this.SomeOneConnected = null ;
66 this.SomeOneDisConnected = null ;
67 this.UserAction = null ;
68 }
69
70 #endregion
71
72 Start ,Stop#region Start ,Stop
73 public void Start()
74 {
75 if(this.stateIsStop)
76 {
77 this.xtcpListener.Start() ;
78 this.stateIsStop = false ;
79 }
80 }
81
82 public void Stop()
83 {
84 if(this.stateIsStop)
85 {
86 return ;
87 }
88
89 this.stateIsStop = true ;
90 this.xtcpListener.Stop() ;
91
92 //关闭所有连接
93 int count = 0 ;
94 while(! this.contextKeyMgr.IsAllStreamSafeToStop()) //等待所有流到达停止安全点
95 {
96 Thread.Sleep(200) ;
97 if(10 == count++)
98 {
99 break ;
100 }
101 }
102 this.contextKeyMgr.DisposeAllContextKey() ;
103 }
104 #endregion
105
106 #endregion
107
108 ITcpEventList 成员#region ITcpEventList 成员
109
110 public event EnterpriseServerBase.Network.CallBackForTcpUser2 SomeOneConnected;
111
112 public event EnterpriseServerBase.Network.CallBackForTcpMonitor ServiceCommitted;
113
114 public event EnterpriseServerBase.Network.CallBackForTcpCount ConnectionCountChanged;
115
116 public event EnterpriseServerBase.Network.CallBackForTcpUser1 SomeOneDisConnected;
117
118 public event EnterpriseServerBase.Network.CallBackForTcpUser UserAction;
119
120 #endregion
121
122 ITcpClientsController 成员#region ITcpClientsController 成员
123
124 public bool SynRecieveFrom(int ConnectID ,byte[] buffer, int offset, int size ,out int readCount)
125 {
126 readCount = 0 ;
127 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ;
128 if(netStream == null)
129 {
130 return false ;
131 }
132
133 readCount = netStream.Read(buffer ,offset ,size) ;
134
135 return true ;
136 }
137
138 public void SendData(int ConnectID, byte[] data)
139 {
140 this.SendData(ConnectID ,data ,0 ,data.Length) ;
141 }
142
143 public void SendData(int ConnectID, byte[] data ,int offset ,int size)
144 {
145 if((data == null) || (data.Length == 0) || (offset <0) ||(size <0) || (offset+size > data.Length))
146 {
147 return ;
148 }
149
150 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ;
151 if(netStream == null)
152 {
153 return ;
154 }
155
156 netStream.Write(data ,offset ,size) ;
157 }
158
159 public void DisposeOneConnection(int connectID, DisconnectedCause cause)
160 {
161 this.DisposeOneConnection(connectID) ;
162
163 if(this.SomeOneDisConnected != null)
164 {
165 this.SomeOneDisConnected(connectID , cause) ;
166 }
167
168 this.ActivateUserActionEvent(connectID ,TcpUserAction.Exit) ;
169 }
170
171 #endregion
172
173 ITcp 成员#region ITcp 成员
174 public int ConnectionCount
175 {
176 get
177 {
178 return this.contextKeyMgr.ConnectionCount ;
179 }
180 }
181
182 #endregion
183
184 private#region private
185
186 ActivateUserActionEvent#region ActivateUserActionEvent
187 private void ActivateUserActionEvent(int ConnectID ,TcpUserAction action)
188 {
189 if(this.UserAction != null)
190 {
191 this.UserAction(ConnectID ,action) ;
192 }
193 }
194 #endregion
195
196 DisposeOneConnection#region DisposeOneConnection
197 /**//// <summary>
198 /// DisposeOneConnection 主要由用户管理模块调用--当无法检测到掉线情况时,该方法保证资源被释放
199 /// </summary>
200 private void DisposeOneConnection(int connectID)
201 {
202 this.contextKeyMgr.RemoveContextKey(connectID) ;
203 }
204 #endregion
205
206 xtcpListener_TcpConnectionEstablished#region xtcpListener_TcpConnectionEstablished
207 private void xtcpListener_TcpConnectionEstablished(NetworkStream stream)
208 {
209 ISafeNetworkStream safeStream = new SafeNetworkStream(stream) ;
210
211 ContextKey key = new ContextKey(safeStream ,AsynTcp.BufferSize) ;
212 key.ResetBuffer(null) ;
213 this.contextKeyMgr.InsertContextKey(key) ;
214 int connectID = key.NetStream.GetHashCode() ;
215
216 if(this.SomeOneConnected != null)
217 {
218 this.SomeOneConnected(connectID) ;
219 }
220 this.ActivateUserActionEvent(connectID ,TcpUserAction.Logon) ;
221
222 key.IsFirstMsg = true ;
223 this.RecieveDataFrom(key) ;
224 }
225 #endregion
226
227 PutoutDynamicMsg#region PutoutDynamicMsg
228 private void PutoutDynamicMsg(string msg)
229 {
230 if(this.DynamicMsgArrived != null)
231 {
232 this.DynamicMsgArrived(msg) ;
233 }
234 }
235 #endregion
236
237 OnStreamCountChanged#region OnStreamCountChanged
238 private void OnStreamCountChanged(int count)
239 {
240 if(this.ConnectionCountChanged != null)
241 {
242 this.ConnectionCountChanged(count) ;
243 }
244 }
245 #endregion
246
247 RecieveDataFrom#region RecieveDataFrom
248 private void RecieveDataFrom(ContextKey key)
249 {
250 key.StreamState = NetStreamState.Reading ;
251 key.NetStream.BeginRead(key.Buffer ,key.StartOffsetForRecieve ,key.MaxRecieveCapacity ,new AsyncCallback(this.ServeOverLap) ,key) ;
252
253 }
254 #endregion
255
256 ServeOverLap#region ServeOverLap
257 private void ServeOverLap(IAsyncResult ar)
258 {
259 ContextKey key = (ContextKey)ar.AsyncState ;
260 int streamHashCode = key.NetStream.GetHashCode() ; //是SafeNetworkStream的hashcode
261
262 try
263 {
264 key.BytesRead = key.NetStream.EndRead(ar) ;
265
266 if(! this.CheckData(key))
267 {
268 return ;
269 }
270
271 //处理请求
272 byte[] leftData = null ;
273 ArrayList repondList = this.messageDispatcher.DealRequestMessage(key.RequestData ,out leftData , ref key.Validation) ;
274
275 if(this.validateRequest)
276 {
277 if(key.Validation.gotoCloseConnection)
278 {
279 this.DisposeOneConnection(streamHashCode ,key.Validation.cause) ;
280 }
281 }
282
283 key.StreamState = NetStreamState.Writing ;
284 if(repondList!= null && (repondList.Count != 0))
285 {
286 foreach(object obj in repondList)
287 {
288 byte[] respond_stream = (byte[])obj ;
289 key.NetStream.Write(respond_stream ,0 ,respond_stream.Length) ;
290 if(this.ServiceCommitted != null)
291 {
292 RespondInformation info = new RespondInformation() ;
293 info.ConnectID = streamHashCode ;
294 info.ServiceKey = this.messageDispatcher.GetServiceKey(respond_stream) ;
295 info.repondData = respond_stream ;
296 this.ServiceCommitted(info) ;
297 }
298
299 this.ActivateUserActionEvent(streamHashCode ,TcpUserAction.FunctionAccess) ;
300 }
301 }
302
303 if(key.IsFirstMsg)
304 {
305 if(repondList == null || (repondList.Count == 0)) //表示第一条消息还未接收完全
306 {
307 key.IsFirstMsg = true ;
308 }
309 else
310 {
311 key.IsFirstMsg = false ;
312 }
313 }
314
315 key.StreamState = NetStreamState.Idle ;
316
317 key.ResetBuffer(leftData) ;
318
319 if(! this.stateIsStop)
320 {
321 //继续接收请求
322 this.RecieveDataFrom(key) ;
323 }
324 else //停止服务
325 {
326 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ;
327 }
328 }
329 catch(Exception ee)
330 {
331 if(ee is System.IO.IOException) //正在读写流的时候,连接断开
332 {
333 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ;
334 }
335
336 ee = ee ;
337 }
338 }
339 #endregion
340
341 CheckData#region CheckData
342 private bool CheckData(ContextKey key)
343 {
344 int streamHashcode = key.NetStream.GetHashCode() ;
345 if(this.stateIsStop)
346 {
347 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.ServerStopped) ;
348 return false;
349 }
350
351 if(key.BytesRead == 0) //表示客户端掉线或非正常关闭连接
352 {
353 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ;
354 return false ;
355 }
356
357 if(key.BytesRead == 8)//表示客户端正常关闭连接
358 {
359 string ss = System.Text.Encoding.BigEndianUnicode.GetString(key.Buffer ,0 ,8) ;
360 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ;
361 return false;
362 }
363
364 return true ;
365 }
366 #endregion
367 #endregion
368
369 INet 成员#region INet 成员
370
371 public IReqestStreamDispatcher Dispatcher
372 {
373 set
374 {
375 this.messageDispatcher = (ITcpReqStreamDispatcher)value ;
376 }
377 }
378
379 public int Port
380 {
381 set
382 {
383 this.curPort = value ;
384 }
385 get
386 {
387 return this.curPort ;
388 }
389 }
390
391 public bool UserValidated
392 {
393 set
394 {
395 this.validateRequest = value ;
396 }
397 }
398
399 #endregion
400 }

  今天介绍了Tcp通信层中的核心――Tcp组件,仅仅复用Tcp组件已经能为我们省去很多麻烦了,如果想进行更高层次的复用――整个Tcp通信层的复用,请关注本篇的续文。

查看本文来源

    • 评论
    • 分享微博
    • 分享邮件
    邮件订阅

    如果您非常迫切的想了解IT领域最新产品与技术信息,那么订阅至顶网技术邮件将是您的最佳途径之一。

    重磅专题
    往期文章
    最新文章