扫一扫
分享文章到微信
扫一扫
关注官方公众号
至顶头条
作者:zhuweisky 来源:博客园 2007年11月4日
关键字:
public interface IRequestQueueManager :IRequestPusher { object Pop() ;//弹出队列中的下一个请求 void Clear() ; int Length {get ;} //队列长度 } public interface IRequestPusher { void Push(object package) ; //向队列中压入一个请求 } |
/// <summary> /// IIOCPManager 完成端口管理者,主要管理工作者线程和完成端口队列。 /// </summary> public interface IIOCPManager : IRequestPusher { void Initialize(IOCPPackageHandler i_packageHandler ,int threadCount) ; void Start() ; //启动工作者线程 void Stop() ; //退出工作者线程 int WorkThreadCount{get ;} event CallBackPackageHandled PackageHandled ; } //IOCPPackageHandler 用于处理从完成端口队列中取出的package public interface IOCPPackageHandler { void HandlerPackage(object package) ; //一般以同步实现 } |
1/**//// <summary> 2 /// IocpTcp 完成端口Tcp组件。 3 /// </summary> 4 public class IocpTcp :ITcp ,IOCPPackageHandler 5 { 6 members#region members 7 private const int BufferSize = 1024 ; 8 private const int MaxWorkThreadNum = 50 ; 9 10 private IXTcpListener xtcpListener ; 11 private IIOCPManager iocpMgr = null ; 12 private ITcpReqStreamDispatcher messageDispatcher = null ; 13 private ContextKeyManager contextKeyMgr = new ContextKeyManager() ; 14 private bool stateIsStop = true ; 15 private bool validateRequest = false ; 16 private int curPort = 8888 ; 17 #endregion 18 19 public IocpTcp() 20 { 21 22 } 23 ITcp 成员#region ITcp 成员 24 public int ConnectionCount 25 { 26 get 27 { 28 return this.contextKeyMgr.ConnectionCount ; 29 } 30 } 31 32 #endregion 33 34 INet 成员#region INet 成员 35 36 InitializeAll ,UnitializeAll#region InitializeAll ,UnitializeAll 37 public void InitializeAll(IReqestStreamDispatcher i_dispatcher ,int port , bool userValidated) 38 { 39 this.messageDispatcher = i_dispatcher as ITcpReqStreamDispatcher; 40 if(this.messageDispatcher == null) 41 { 42 throw new Exception("Can't convert IReqestStreamDispatcher to ITcpReqStreamDispatcher in CompletePortManager.InitializeAll method ! ") ; 43 } 44 45 this.validateRequest = userValidated ; 46 this.curPort = port ; 47 48 this.InitializeAll() ; 49 } 50 51 public void InitializeAll() 52 { 53 this.xtcpListener = new XTcpListener(this.curPort) ; 54 this.xtcpListener.TcpConnectionEstablished += new CBackUserLogon(xtcpListener_TcpConnectionEstablished); 55 this.xtcpListener.DynamicMsgArrived += new CallBackDynamicMsg(this.PutoutDynamicMsg) ; 56 this.contextKeyMgr.StreamCountChanged += new CallBackCountChanged (this.OnStreamCountChanged) ; 57 58 this.iocpMgr = new IOCPManager() ; 59 this.iocpMgr.Initialize(this , IocpTcp.MaxWorkThreadNum) ; 60 } 61 62 public void UnitializeAll() 63 { 64 this.Stop() ; 65 this.xtcpListener.ExitListenThread() ; 66 67 //将事件容器清空==》防止外部框架再多次初始化的过程中将一个事件预定多次 68 this.ConnectionCountChanged = null ; 69 this.DynamicMsgArrived = null ; 70 this.ServiceCommitted = null ; 71 this.SomeOneConnected = null ; 72 this.SomeOneDisConnected = null ; 73 this.UserAction = null ; 74 } 75 #endregion 76 77 Start ,Stop#region Start ,Stop 78 public void Start() 79 { 80 try 81 { 82 if(this.stateIsStop) 83 { 84 this.stateIsStop = false ; 85 this.xtcpListener.Start() ; 86 this.iocpMgr.Start() ; 87 } 88 } 89 catch(Exception ee) 90 { 91 throw ee ; 92 } 93 } 94 95 public void Stop() 96 { 97 if(this.stateIsStop) 98 { 99 return ; 100 } 101 102 this.stateIsStop = true ; 103 this.xtcpListener.Stop() ; 104 this.iocpMgr.Stop() ; 105 106 //关闭所有连接 107 int count = 0 ; 108 while(! this.contextKeyMgr.IsAllStreamSafeToStop()) //等待所有流到达停止安全点 109 { 110 Thread.Sleep(200) ; 111 if(10 == count++) 112 { 113 break ; 114 } 115 } 116 this.contextKeyMgr.DisposeAllContextKey() ; 117 } 118 #endregion 119 120 public event EnterpriseServerBase.Network.CallBackDynamicMessage DynamicMsgArrived; 121 122 public NetAddinType GetProtocalType() 123 { 124 return NetAddinType.Tcp ; 125 } 126 127 #endregion 128 129 ITcpEventList 成员#region ITcpEventList 成员 130 public event EnterpriseServerBase.Network.CallBackForTcpUser2 SomeOneConnected; 131 132 public event EnterpriseServerBase.Network.CallBackForTcpMonitor ServiceCommitted; 133 134 public event EnterpriseServerBase.Network.CallBackForTcpCount ConnectionCountChanged; 135 136 public event EnterpriseServerBase.Network.CallBackForTcpUser1 SomeOneDisConnected; 137 138 public event EnterpriseServerBase.Network.CallBackForTcpUser UserAction; 139 140 #endregion 141 142 ITcpClientsController 成员#region ITcpClientsController 成员 143 144 public void SendData(int ConnectID, byte[] data) 145 { 146 this.SendData(ConnectID ,data ,0 ,data.Length) ; 147 } 148 149 public void SendData(int ConnectID, byte[] data ,int offset ,int size) 150 { 151 if((data == null) || (data.Length == 0) || (offset <0) ||(size <0) || (offset+size > data.Length)) 152 { 153 return ; 154 } 155 156 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ; 157 if(netStream == null) 158 { 159 return ; 160 } 161 162 netStream.Write(data ,offset ,size) ; 163 } 164 165 public bool SynRecieveFrom(int ConnectID ,byte[] buffer, int offset, int size ,out int readCount) 166 { 167 readCount = 0 ; 168 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ; 169 if(netStream == null) 170 { 171 return false ; 172 } 173 174 readCount = netStream.Read(buffer ,offset ,size) ; 175 176 return true ; 177 } 178 179 public void DisposeOneConnection(int connectID, EnterpriseServerBase.Network.DisconnectedCause cause) 180 { 181 this.DisposeOneConnection(connectID) ; 182 183 if(this.SomeOneDisConnected != null) 184 { 185 this.SomeOneDisConnected(connectID ,cause) ; 186 } 187 188 this.ActivateUserActionEvent(connectID ,TcpUserAction.Exit) ; 189 } 190 191 /**//// <summary> 192 /// DisposeOneConnection 主要由用户管理模块调用--当无法检测到掉线情况时,该方法保证资源被释放 193 /// </summary> 194 private void DisposeOneConnection(int connectID) 195 { 196 this.contextKeyMgr.RemoveContextKey(connectID) ; 197 } 198 199 #endregion 200 201 private#region private 202 BindRequestToQueue#region BindRequestToQueue 203 private void BindRequestToQueue(IAsyncResult ar) 204 { 205 try 206 { 207 ContextKey key = (ContextKey)ar.AsyncState ; 208 key.BytesRead = key.NetStream.EndRead(ar) ; 209 if(! this.CheckData(key)) 210 { 211 return ; 212 } 213 214 this.iocpMgr.Push(key) ; 215 } 216 catch(Exception ee) 217 { 218 ee = ee ; 219 } 220 } 221 222 CheckData#region CheckData 223 private bool CheckData(ContextKey key) 224 { 225 int streamHashcode = key.NetStream.GetHashCode() ; 226 if(this.stateIsStop) 227 { 228 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.ServerStopped) ; 229 return false; 230 } 231 232 if(key.BytesRead == 0) //表示客户端掉线或非正常关闭连接 233 { 234 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ; 235 return false ; 236 } 237 238 if(key.BytesRead == 8)//表示客户端正常关闭连接 239 { 240 string ss = System.Text.Encoding.BigEndianUnicode.GetString(key.Buffer ,0 ,8) ; 241 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ; 242 return false; 243 } 244 245 return true ; 246 } 247 #endregion 248 #endregion 249 250 xtcpListener_TcpConnectionEstablished#region xtcpListener_TcpConnectionEstablished 251 private void xtcpListener_TcpConnectionEstablished(NetworkStream stream) 252 { 253 ISafeNetworkStream safeStream = new SafeNetworkStream(stream) ; 254 ContextKey key = new ContextKey(safeStream ,IocpTcp.BufferSize) ; 255 key.ResetBuffer(null) ; 256 this.contextKeyMgr.InsertContextKey(key) ; 257 int connectID = key.NetStream.GetHashCode() ; 258 if(this.SomeOneConnected != null) 259 { 260 this.SomeOneConnected(connectID) ; 261 } 262 263 this.ActivateUserActionEvent(connectID ,TcpUserAction.Logon) ; 264 265 key.IsFirstMsg = true ; 266 this.RecieveDataFrom(key) ; 267 } 268 #endregion 269 270 ActivateUserActionEvent#region ActivateUserActionEvent 271 private void ActivateUserActionEvent(int ConnectID ,TcpUserAction action) 272 { 273 if(this.UserAction != null) 274 { 275 this.UserAction(ConnectID ,action) ; 276 } 277 } 278 #endregion 279 280 PutoutDynamicMsg#region PutoutDynamicMsg 281 private void PutoutDynamicMsg(string msg) 282 { 283 if(this.DynamicMsgArrived != null) 284 { 285 this.DynamicMsgArrived(msg) ; 286 } 287 } 288 #endregion 289 290 OnStreamCountChanged#region OnStreamCountChanged 291 private void OnStreamCountChanged(int count) 292 { 293 if(this.ConnectionCountChanged != null) 294 { 295 this.ConnectionCountChanged(count) ; 296 } 297 } 298 #endregion 299 300 RecieveDataFrom#region RecieveDataFrom 301 private void RecieveDataFrom(ContextKey key) 302 { 303 try 304 { 305 key.StreamState = NetStreamState.Reading ; 306 key.NetStream.BeginRead(key.Buffer ,key.StartOffsetForRecieve ,key.MaxRecieveCapacity ,new AsyncCallback(this.BindRequestToQueue) ,key) ; 307 } 308 catch(Exception ee) 309 { 310 ee = ee ; 311 } 312 313 } 314 #endregion 315 #endregion 316 317 IOCPPackageHandler 成员#region IOCPPackageHandler 成员 318 319 public void HandlerPackage(object package) 320 { 321 ContextKey key = package as ContextKey ; 322 if(key == null) 323 { 324 return ; 325 } 326 327 int streamHashCode = key.NetStream.GetHashCode() ; //是SafeNetworkStream的hashcode 328 329 //处理请求 330 try 331 { 332 byte[] leftData = null ; 333 ArrayList repondList = this.messageDispatcher.DealRequestMessage(key.RequestData ,out leftData , ref key.Validation) ; 334 335 if(this.validateRequest) 336 { 337 if(key.Validation.gotoCloseConnection) 338 { 339 this.DisposeOneConnection(streamHashCode ,key.Validation.cause) ; 340 return ; 341 } 342 } 343 344 key.StreamState = NetStreamState.Writing ; 345 if(repondList!= null && (repondList.Count != 0)) 346 { 347 foreach(object obj in repondList) 348 { 349 byte[] respond_stream = (byte[])obj ; 350 key.NetStream.Write(respond_stream ,0 ,respond_stream.Length) ; 351 if(this.ServiceCommitted != null) 352 { 353 RespondInformation info = new RespondInformation() ; 354 info.ConnectID = streamHashCode ; 355 info.ServiceKey = this.messageDispatcher.GetServiceKey(respond_stream) ; 356 info.repondData = respond_stream ; 357 this.ServiceCommitted(info) ; 358 } 359 this.ActivateUserActionEvent(streamHashCode ,TcpUserAction.FunctionAccess) ; 360 } 361 } 362 363 if(key.IsFirstMsg) 364 { 365 if(repondList == null || (repondList.Count == 0)) //表示第一条消息还未接收完全 366 { 367 key.IsFirstMsg = true ; 368 } 369 else 370 { 371 key.IsFirstMsg = false ; 372 } 373 } 374 375 key.StreamState = NetStreamState.Idle ; 376 377 key.ResetBuffer(leftData) ; 378 379 if(! this.stateIsStop) 380 { 381 //继续接收请求 382 this.RecieveDataFrom(key) ; 383 } 384 else //停止服务 385 { 386 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ; 387 } 388 } 389 catch(Exception ee) 390 { 391 if(ee is System.IO.IOException) //正在读写流的时候,连接断开 392 { 393 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ; 394 } 395 396 ee = ee ; 397 } 398 } 399 400 #endregion 401 402 INet 成员#region INet 成员 403 404 public IReqestStreamDispatcher Dispatcher 405 { 406 set 407 { 408 this.messageDispatcher = (ITcpReqStreamDispatcher)value ; 409 } 410 } 411 412 public int Port 413 { 414 set 415 { 416 this.curPort = value ; 417 } 418 get 419 { 420 return this.curPort ; 421 } 422 } 423 424 public bool UserValidated 425 { 426 set 427 { 428 this.validateRequest = value ; 429 } 430 } 431 432 #endregion 433 } |
如果您非常迫切的想了解IT领域最新产品与技术信息,那么订阅至顶网技术邮件将是您的最佳途径之一。
现场直击|2021世界人工智能大会
直击5G创新地带,就在2021MWC上海
5G已至 转型当时——服务提供商如何把握转型的绝佳时机
寻找自己的Flag
华为开发者大会2020(Cloud)- 科技行者