(3)
消息处理器工厂
消息处理器工厂根据消息的类型(TypeKey、ServiceKey)创建对应的消息处理器来出来该消息,其接口定义如下:
public interface IRequestDealerFactory
{
IRequestDealer CreateDealer(int requestType ,int serverTypeKey) ;//serverTypeKey 比如城市代号
event CbackRequestRecieved RequestRecieved ;
}
CreateDealer方法返回的IRequestDealer就是消息处理器,每一个消息处理器用于处理某种特定类型(ServiceKey)的所有请求。通常,可以将消息处理器封装成插件DLL,以实现功能服务的“热插拔”。
(4)消息处理器
消息处理器IRequestDealer定义如下:
public interface IRequestDealer
{
byte[] DealRequestMessage(RoundedRequestMsg reqMsg ) ;//同步回复
event CbackRequestRecieved RequestRecieved ;
}
public delegate void CbackRequestRecieved(RoundedRequestMsg roundedMsg) ;
/// <summary>
/// RoundedRequestMsg 对应于一条完整的请求
/// </summary>
public struct RoundedRequestMsg
{
public int ConnectID ; //请求所对应的Tcp连接
public byte[] Data ;
}
RoundedRequestMsg.Data是经消息分裂器分裂得到的一个完整的请求消息,一个字节不多、一个字节也不少。
(5)ITcpStreamDispatcherHook
ITcpStreamDispatcherHook是一个Hook,它为用户提供了一个自定义的对请求/回复消息进行操作的插入点。ITcpStreamDispatcherHook由TcpStreamDispatcher使用,用于对请求消息和回复消息进行截获,然后处理或转换这些消息,比如常用的处理/转换操作包括:加密/解密、消息验证等等。ITcpStreamDispatcherHook定义如下:
/// <summary>
/// ITcpStreamDispatcherHook 由TcpStreamDispatcher使用,用于对请求消息和回复消息进行截获,然后处理转换这些消息,
/// 比如加密/解密。
/// </summary>
public interface ITcpStreamDispatcherHook
{
//转换消息
byte[] CaptureRequestMsg(byte[] roundedMsg) ;
byte[] CaptureRespondMsg(byte[] roundedMsg) ;
//验证消息,以下验证的消息是还没有被捕获的消息
bool VerifyFirstMsgOfUser(byte[] roundedMsg ,ref RequestValidation validation) ;
bool VerifyOtherMessage(byte[] roundedMsg ,ref RequestValidation validation) ;
}
关于这个接口中各方法的含义可以在消息分派器的实现中更好的领会!
3.消息分派器实现
在前述的基本元素的基础上,实现消息分派器非常简单,我们来看其核心方法DealRequestMessage的实现源码:
private IMessageSplitter curMsgSplitter = new MessageSpliter() ;
private IDataStreamHelper curMsgHelper ; //必须设置
private IRequestDealerFactory curDealerFactory ; //必须设置
private ITcpStreamDispatcherHook tcpStreamDispatcherHook ;
public ArrayList DealRequestMessage(RequestData requestData, out byte[] leftData, ref RequestValidation validation)
{
//消息分裂
ArrayList respondList = new ArrayList() ;
ArrayList reqList = this.curMsgSplitter.SplitRequestMsgs(requestData.Buff ,requestData.ValidCount ,out leftData) ;
if(reqList == null)
{
return respondList ;
}
bool verified = true ;
for(int i=0; i<reqList.Count ;i++)
{
byte[] theData = (byte[])reqList[i] ;
#region 验证消息
if(requestData.IsFirstMsg && (i == 0))
{
verified = this.tcpStreamDispatcherHook.VerifyFirstMsgOfUser(theData ,ref validation) ;
}
else
{
verified = this.tcpStreamDispatcherHook.VerifyOtherMessage(theData ,ref validation ) ;
}
if(! verified)
{
if(validation.gotoCloseConnection)
{
return null ;
}
this.AddRespondToList(respondList ,this.curMsgHelper.GetRespondWhenFailure(theData ,ServiceFailureType.InvalidMessge)) ;
continue ;
}
#endregion
//接插,捕获/转换请求消息
byte[] reqData = this.tcpStreamDispatcherHook.CaptureRequestMsg(theData) ;
#region 处理消息
//处理消息
IDataStreamHeader header = this.curMsgHelper.ParseMessageHeader(reqData ,0);
IRequestDealer dealer = this.curDealerFactory.CreateDealer(header.ServiceKey ,header.TypeKey) ;
if(dealer == null)
{
this.AddRespondToList(respondList ,this.curMsgHelper.GetRespondWhenFailure(reqData ,ServiceFailureType.ServiceIsNotExit)) ;
continue ;
}
RoundedRequestMsg roundReqMsg = new RoundedRequestMsg();
roundReqMsg.ConnectID = requestData.ConnectID ;
roundReqMsg.Data = reqData ;
try
{
byte[] respondData = dealer.DealRequestMessage(roundReqMsg) ;
if(respondData != null)
{
this.AddRespondToList(respondList ,respondData) ;
}
}
catch(Exception ee)
{
this.AddRespondToList(respondList , this.curMsgHelper.GetRespondWhenFailure(reqData ,ee.Message)) ;
}
#endregion
}
return respondList;
}
//将回复消息加密后放入list
private void AddRespondToList(ArrayList list ,byte[] theRespondData)
{
//接插,捕获/转换回复消息
byte[] respondData = this.tcpStreamDispatcherHook.CaptureRespondMsg(theRespondData) ;
list.Add(respondData) ;
}
如果你是一直按顺序读下来的,理解上面的实现一定不成什么问题。到这里,Tcp通信层的所有重要的设施基本都已介绍完毕,最后,给出了提示,即,在你的应用中,如何使用这个可复用的Tcp通信层。步骤如下:
(1)实现IDataStreamHelper接口。
(2)实现IReqestStreamDispatcher接口,如果采用的是Tcp协议,则可直接使用参考实现TcpStreamDispatcher
(3)实现各种请求处理器,这些处理器实现IRequestDealer接口。
(4)实现IRequestDealerFactory接口。
接下来,还有什么?其实,还有很多,都可以提高到框架的层次,以便复用。比如,前面我们处理消息都是基于流(byte[])的形式,在此基础上,我们可以更上一层,采用基于对象的形式――即,将请求消息和回复消息都封装成类,这就涉及了流的解析(流=>对象)和对象序列化(消息对象=>流)问题;另外,我们甚至可以将Tcp用户管理纳入到框架的高度,以进行复用,比如,通常基于Tcp服务的系统都需要管理在线的Tcp用户,并记录Tcp用户请求服务的具体信息、在线时间等,这些经过良好的分析概括都可以提高到复用的高度。以后有时间,我会将这样的经验和大家分享。
最后,把EnterpriseServerBase类库中的Network命名空间中的源码和大家共享,希望对大家有所帮助!(另,该命名空间中已经包含了上述的基于对象的消息和Tcp用户管理的可复用组件)。
查看本文来源