科技行者

行者学院 转型私董会 科技行者专题报道 网红大战科技行者

知识库

知识库 安全导航

至顶网软件频道基础软件.NET下可复用的TCP通信层实现之TCP组件

.NET下可复用的TCP通信层实现之TCP组件

  • 扫一扫
    分享文章到微信

  • 扫一扫
    关注官方公众号
    至顶头条

以前就很想将自己在Tcp通信层的开发心得、经验共享出来,但一直没有实现,究其原因,还是自己太懒了

作者:zhuweisky 来源:博客园 2007年11月4日

关键字:

  • 评论
  • 分享微博
  • 分享邮件
4.完成端口Tcp组件实现

  前面已经提到,完成端口模型本质思想是将"启动异步操作的线程"和"提供服务的线程"(即工作者线程)拆伙。只要做到这一点,就模拟了完成端口。

  分析一下我们需要几种类型的线程,首先我们需要一个线程来接收TCP连接请求,这就是所谓监听线程,当成功的接收到一个连接后,就向连接发送一个异步接收数据的请求,由于是异步操作,所以会立即返回,然后再去接收新的连接请求,如此监听线程就循环运作起来了(已经封装成前述的XTcpListener组件了)。值得提出的是,在异步接收的回调函数中,应该对接收到的数据进行处理,完成端口模型所做的就是将接收到的数据放在了完成端口队列中,注意,是一个队列。第二种线程类型,就是工作者线程。工作者线程的个数有个经验值是( Cpu个数×2 + 2),当然具体取多少,还要取决于你的应用的要求。工作者线程的任务就是不断地从完成端口队列中取出数据,并处理它,然后如果有回复,再将回复写入对应的连接。

  让我们来定义接口IRequestQueueManager,用于模拟完成端口的队列,该队列是线程安全的,用于将所有的请求进行排队,然后由工作者线程来轮流处理这些请求。

public interface IRequestQueueManager :IRequestPusher
{
 object Pop() ;//弹出队列中的下一个请求
 void Clear() ;
 int Length {get ;} //队列长度
}

public interface IRequestPusher
{
 void Push(object package) ; //向队列中压入一个请求
}

  在IRequestQueueManager的基础上,可以将工作者线程和启动异步操作的线程拆开了。由于工作者线程只与端口队列相关,所以我决定将它们一起封装起来--成为IIOCPManager,用于管理请求队列和工作者线程。

/// <summary>
/// IIOCPManager 完成端口管理者,主要管理工作者线程和完成端口队列。
/// </summary>
public interface IIOCPManager : IRequestPusher
{
 void Initialize(IOCPPackageHandler i_packageHandler ,int threadCount) ;
 void Start() ; //启动工作者线程
 void Stop() ; //退出工作者线程

 int WorkThreadCount{get ;}

 event CallBackPackageHandled PackageHandled ;
}

//IOCPPackageHandler 用于处理从完成端口队列中取出的package
public interface IOCPPackageHandler
{
 void HandlerPackage(object package) ; //一般以同步实现
}

  有了IRequestQueueManager和IIOCPManager的支持,实现基于完成端口模型的Tcp组件就非常简单了。当然,你也可以单独使用IIOCPManager。你只要提供一个监听者线程接收连接,并将从连接接收到的数据通过IRequestPusher接口放入端口队列就可以了。 当然,为了处理接收到的数据,我们需要提供一个实现了IOCPPackageHandler接口的对象给IOCPManager。值得提出的是,你可以在数据处理并发送了回复数据后,再次投递一个异步接收请求,以保证能源源不断的从对应的TCP连接接收数据。下面,我们来看基于完成端口模型的Tcp组件的完整实现。

  完成端口Tcp组件

1/**//// <summary>
2 /// IocpTcp 完成端口Tcp组件。
3 /// </summary>
4 public class IocpTcp :ITcp ,IOCPPackageHandler
5 {
6 members#region members
7 private const int BufferSize = 1024 ;
8 private const int MaxWorkThreadNum = 50 ;
9
10 private IXTcpListener xtcpListener ;
11 private IIOCPManager iocpMgr = null ;
12 private ITcpReqStreamDispatcher messageDispatcher = null ;
13 private ContextKeyManager contextKeyMgr = new ContextKeyManager() ;
14 private bool stateIsStop = true ;
15 private bool validateRequest = false ;
16 private int curPort = 8888 ;
17 #endregion
18
19 public IocpTcp()
20 {
21
22 }
23 ITcp 成员#region ITcp 成员
24 public int ConnectionCount
25 {
26 get
27 {
28 return this.contextKeyMgr.ConnectionCount ;
29 }
30 }
31
32 #endregion
33
34 INet 成员#region INet 成员
35
36 InitializeAll ,UnitializeAll#region InitializeAll ,UnitializeAll
37 public void InitializeAll(IReqestStreamDispatcher i_dispatcher ,int port , bool userValidated)
38 {
39 this.messageDispatcher = i_dispatcher as ITcpReqStreamDispatcher;
40 if(this.messageDispatcher == null)
41 {
42 throw new Exception("Can't convert IReqestStreamDispatcher to ITcpReqStreamDispatcher in CompletePortManager.InitializeAll method ! ") ;
43 }
44
45 this.validateRequest = userValidated ;
46 this.curPort = port ;
47
48 this.InitializeAll() ;
49 }
50
51 public void InitializeAll()
52 {
53 this.xtcpListener = new XTcpListener(this.curPort) ;
54 this.xtcpListener.TcpConnectionEstablished += new CBackUserLogon(xtcpListener_TcpConnectionEstablished);
55 this.xtcpListener.DynamicMsgArrived += new CallBackDynamicMsg(this.PutoutDynamicMsg) ;
56 this.contextKeyMgr.StreamCountChanged += new CallBackCountChanged (this.OnStreamCountChanged) ;
57
58 this.iocpMgr = new IOCPManager() ;
59 this.iocpMgr.Initialize(this , IocpTcp.MaxWorkThreadNum) ;
60 }
61
62 public void UnitializeAll()
63 {
64 this.Stop() ;
65 this.xtcpListener.ExitListenThread() ;
66
67 //将事件容器清空==》防止外部框架再多次初始化的过程中将一个事件预定多次
68 this.ConnectionCountChanged = null ;
69 this.DynamicMsgArrived = null ;
70 this.ServiceCommitted = null ;
71 this.SomeOneConnected = null ;
72 this.SomeOneDisConnected = null ;
73 this.UserAction = null ;
74 }
75 #endregion
76
77 Start ,Stop#region Start ,Stop
78 public void Start()
79 {
80 try
81 {
82 if(this.stateIsStop)
83 {
84 this.stateIsStop = false ;
85 this.xtcpListener.Start() ;
86 this.iocpMgr.Start() ;
87 }
88 }
89 catch(Exception ee)
90 {
91 throw ee ;
92 }
93 }
94
95 public void Stop()
96 {
97 if(this.stateIsStop)
98 {
99 return ;
100 }
101
102 this.stateIsStop = true ;
103 this.xtcpListener.Stop() ;
104 this.iocpMgr.Stop() ;
105
106 //关闭所有连接
107 int count = 0 ;
108 while(! this.contextKeyMgr.IsAllStreamSafeToStop()) //等待所有流到达停止安全点
109 {
110 Thread.Sleep(200) ;
111 if(10 == count++)
112 {
113 break ;
114 }
115 }
116 this.contextKeyMgr.DisposeAllContextKey() ;
117 }
118 #endregion
119
120 public event EnterpriseServerBase.Network.CallBackDynamicMessage DynamicMsgArrived;
121
122 public NetAddinType GetProtocalType()
123 {
124 return NetAddinType.Tcp ;
125 }
126
127 #endregion
128
129 ITcpEventList 成员#region ITcpEventList 成员
130 public event EnterpriseServerBase.Network.CallBackForTcpUser2 SomeOneConnected;
131
132 public event EnterpriseServerBase.Network.CallBackForTcpMonitor ServiceCommitted;
133
134 public event EnterpriseServerBase.Network.CallBackForTcpCount ConnectionCountChanged;
135
136 public event EnterpriseServerBase.Network.CallBackForTcpUser1 SomeOneDisConnected;
137
138 public event EnterpriseServerBase.Network.CallBackForTcpUser UserAction;
139
140 #endregion
141
142 ITcpClientsController 成员#region ITcpClientsController 成员
143
144 public void SendData(int ConnectID, byte[] data)
145 {
146 this.SendData(ConnectID ,data ,0 ,data.Length) ;
147 }
148
149 public void SendData(int ConnectID, byte[] data ,int offset ,int size)
150 {
151 if((data == null) || (data.Length == 0) || (offset <0) ||(size <0) || (offset+size > data.Length))
152 {
153 return ;
154 }
155
156 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ;
157 if(netStream == null)
158 {
159 return ;
160 }
161
162 netStream.Write(data ,offset ,size) ;
163 }
164
165 public bool SynRecieveFrom(int ConnectID ,byte[] buffer, int offset, int size ,out int readCount)
166 {
167 readCount = 0 ;
168 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ;
169 if(netStream == null)
170 {
171 return false ;
172 }
173
174 readCount = netStream.Read(buffer ,offset ,size) ;
175
176 return true ;
177 }
178
179 public void DisposeOneConnection(int connectID, EnterpriseServerBase.Network.DisconnectedCause cause)
180 {
181 this.DisposeOneConnection(connectID) ;
182
183 if(this.SomeOneDisConnected != null)
184 {
185 this.SomeOneDisConnected(connectID ,cause) ;
186 }
187
188 this.ActivateUserActionEvent(connectID ,TcpUserAction.Exit) ;
189 }
190
191 /**//// <summary>
192 /// DisposeOneConnection 主要由用户管理模块调用--当无法检测到掉线情况时,该方法保证资源被释放
193 /// </summary>
194 private void DisposeOneConnection(int connectID)
195 {
196 this.contextKeyMgr.RemoveContextKey(connectID) ;
197 }
198
199 #endregion
200
201 private#region private
202 BindRequestToQueue#region BindRequestToQueue
203 private void BindRequestToQueue(IAsyncResult ar)
204 {
205 try
206 {
207 ContextKey key = (ContextKey)ar.AsyncState ;
208 key.BytesRead = key.NetStream.EndRead(ar) ;
209 if(! this.CheckData(key))
210 {
211 return ;
212 }
213
214 this.iocpMgr.Push(key) ;
215 }
216 catch(Exception ee)
217 {
218 ee = ee ;
219 }
220 }
221
222 CheckData#region CheckData
223 private bool CheckData(ContextKey key)
224 {
225 int streamHashcode = key.NetStream.GetHashCode() ;
226 if(this.stateIsStop)
227 {
228 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.ServerStopped) ;
229 return false;
230 }
231
232 if(key.BytesRead == 0) //表示客户端掉线或非正常关闭连接
233 {
234 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ;
235 return false ;
236 }
237
238 if(key.BytesRead == 8)//表示客户端正常关闭连接
239 {
240 string ss = System.Text.Encoding.BigEndianUnicode.GetString(key.Buffer ,0 ,8) ;
241 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ;
242 return false;
243 }
244
245 return true ;
246 }
247 #endregion
248 #endregion
249
250 xtcpListener_TcpConnectionEstablished#region xtcpListener_TcpConnectionEstablished
251 private void xtcpListener_TcpConnectionEstablished(NetworkStream stream)
252 {
253 ISafeNetworkStream safeStream = new SafeNetworkStream(stream) ;
254 ContextKey key = new ContextKey(safeStream ,IocpTcp.BufferSize) ;
255 key.ResetBuffer(null) ;
256 this.contextKeyMgr.InsertContextKey(key) ;
257 int connectID = key.NetStream.GetHashCode() ;
258 if(this.SomeOneConnected != null)
259 {
260 this.SomeOneConnected(connectID) ;
261 }
262
263 this.ActivateUserActionEvent(connectID ,TcpUserAction.Logon) ;
264
265 key.IsFirstMsg = true ;
266 this.RecieveDataFrom(key) ;
267 }
268 #endregion
269
270 ActivateUserActionEvent#region ActivateUserActionEvent
271 private void ActivateUserActionEvent(int ConnectID ,TcpUserAction action)
272 {
273 if(this.UserAction != null)
274 {
275 this.UserAction(ConnectID ,action) ;
276 }
277 }
278 #endregion
279
280 PutoutDynamicMsg#region PutoutDynamicMsg
281 private void PutoutDynamicMsg(string msg)
282 {
283 if(this.DynamicMsgArrived != null)
284 {
285 this.DynamicMsgArrived(msg) ;
286 }
287 }
288 #endregion
289
290 OnStreamCountChanged#region OnStreamCountChanged
291 private void OnStreamCountChanged(int count)
292 {
293 if(this.ConnectionCountChanged != null)
294 {
295 this.ConnectionCountChanged(count) ;
296 }
297 }
298 #endregion
299
300 RecieveDataFrom#region RecieveDataFrom
301 private void RecieveDataFrom(ContextKey key)
302 {
303 try
304 {
305 key.StreamState = NetStreamState.Reading ;
306 key.NetStream.BeginRead(key.Buffer ,key.StartOffsetForRecieve ,key.MaxRecieveCapacity ,new AsyncCallback(this.BindRequestToQueue) ,key) ;
307 }
308 catch(Exception ee)
309 {
310 ee = ee ;
311 }
312
313 }
314 #endregion
315 #endregion
316
317 IOCPPackageHandler 成员#region IOCPPackageHandler 成员
318
319 public void HandlerPackage(object package)
320 {
321 ContextKey key = package as ContextKey ;
322 if(key == null)
323 {
324 return ;
325 }
326
327 int streamHashCode = key.NetStream.GetHashCode() ; //是SafeNetworkStream的hashcode
328
329 //处理请求
330 try
331 {
332 byte[] leftData = null ;
333 ArrayList repondList = this.messageDispatcher.DealRequestMessage(key.RequestData ,out leftData , ref key.Validation) ;
334
335 if(this.validateRequest)
336 {
337 if(key.Validation.gotoCloseConnection)
338 {
339 this.DisposeOneConnection(streamHashCode ,key.Validation.cause) ;
340 return ;
341 }
342 }
343
344 key.StreamState = NetStreamState.Writing ;
345 if(repondList!= null && (repondList.Count != 0))
346 {
347 foreach(object obj in repondList)
348 {
349 byte[] respond_stream = (byte[])obj ;
350 key.NetStream.Write(respond_stream ,0 ,respond_stream.Length) ;
351 if(this.ServiceCommitted != null)
352 {
353 RespondInformation info = new RespondInformation() ;
354 info.ConnectID = streamHashCode ;
355 info.ServiceKey = this.messageDispatcher.GetServiceKey(respond_stream) ;
356 info.repondData = respond_stream ;
357 this.ServiceCommitted(info) ;
358 }
359 this.ActivateUserActionEvent(streamHashCode ,TcpUserAction.FunctionAccess) ;
360 }
361 }
362
363 if(key.IsFirstMsg)
364 {
365 if(repondList == null || (repondList.Count == 0)) //表示第一条消息还未接收完全
366 {
367 key.IsFirstMsg = true ;
368 }
369 else
370 {
371 key.IsFirstMsg = false ;
372 }
373 }
374
375 key.StreamState = NetStreamState.Idle ;
376
377 key.ResetBuffer(leftData) ;
378
379 if(! this.stateIsStop)
380 {
381 //继续接收请求
382 this.RecieveDataFrom(key) ;
383 }
384 else //停止服务
385 {
386 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ;
387 }
388 }
389 catch(Exception ee)
390 {
391 if(ee is System.IO.IOException) //正在读写流的时候,连接断开
392 {
393 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ;
394 }
395
396 ee = ee ;
397 }
398 }
399
400 #endregion
401
402 INet 成员#region INet 成员
403
404 public IReqestStreamDispatcher Dispatcher
405 {
406 set
407 {
408 this.messageDispatcher = (ITcpReqStreamDispatcher)value ;
409 }
410 }
411
412 public int Port
413 {
414 set
415 {
416 this.curPort = value ;
417 }
418 get
419 {
420 return this.curPort ;
421 }
422 }
423
424 public bool UserValidated
425 {
426 set
427 {
428 this.validateRequest = value ;
429 }
430 }
431
432 #endregion
433 }
    • 评论
    • 分享微博
    • 分享邮件
    邮件订阅

    如果您非常迫切的想了解IT领域最新产品与技术信息,那么订阅至顶网技术邮件将是您的最佳途径之一。

    重磅专题
    往期文章
    最新文章