|
|
2#

楼主 |
发表于 2008-11-1 10:16:49
|
只看该作者

.NET可复用TCP通信层之消息分派器组件
IDataStreamHeader即是我们所说的消息的“元数据”,如其名所示,它也是消息的“消息头”。请让我补充说明一下,依照我的经验,消息由消息头Header和消息主体Body组成,消息头用于存放消息的“元数据”等信息,而消息主体用于存放与特定请求相关的数据。消息头的长度固定,比如都是64字节或都是128字节。请求消息和回复消息公用相同格式的消息头。我们来看看消息头接口IDataStreamHeader的定义:
以下为引用的内容:
public interface IDataStreamHeader
{
int MessageLength {get ;set ;} //本消息长度
int TypeKey {get ;set ;} //请求的目录类型
int ServiceKey {get ;set ;} //请求类型
int ServiceItemIndex{get ;set ;} //请求细分索引
int RandomNum {get ;set ;} //用于将回复与请求一一对应起来
int Result {get ;set ;} //服务结果
string UserID {get ;set ;} //发出请求的用户编号
byte[] ToDataStream() ; //将消息头转化为流,流的长度位消息头的长度
void ToDataStream(byte[] buff ,int offset);
}
需要解释一下TypeKey、ServiceKey、ServiceItemIndex,我们实际上将服务类型分为三级,可以举个不太恰当的例子让大家有个感性的认识。比如,生活中的衣、食、住、行可以作为不同的TypeKey,而“衣”中的春装、冬装可作为ServiceKey,而“春装”中的T恤、夹克可作为ServiceItemIndex。对于服务的类型,你可以根据自己的意愿分成任意层级,但据我的经验,通常情况下,三层已经够用了。
(2)消息分裂器
前面已经多次提到消息分裂器MessageSplitter,它用于将接收缓冲区中的数据分裂成一个个完整的消息,并且把余下的非完整数据返回,其接口定义如下:
以下为引用的内容:
public interface IMessageSplitter
{
void Initialize(int maxBuffSize ,int headerLen ,int offSetLenField ,LengthTypeInHeader lenType) ;
ArrayList SplitRequestMsgs(byte[] buff ,int validCount , out byte[] leftData) ;//ArrayList 中每条记录都是是byte[],表示一个完整的请求
}
//消息头中的长度是body长度还是总长度
public enum LengthTypeInHeader
{
TotalLen ,BodyLen
}
其中,Initialize方法中的参数都可以由IDataStreamHeader提供。leftData是余下的非完整消息的数据。SplitRequestMsgs方法返回的集合中是一条条完整的请求消息。
(3)消息处理器工厂
消息处理器工厂根据消息的类型(TypeKey、ServiceKey)创建对应的消息处理器来出来该消息,其接口定义如下:
以下为引用的内容:
public interface IRequestDealerFactory
{
IRequestDealer CreateDealer(int requestType ,int serverTypeKey) ;//serverTypeKey 比如城市代号
event CbackRequestRecieved RequestRecieved ;
}
CreateDealer方法返回的IRequestDealer就是消息处理器,每一个消息处理器用于处理某种特定类型(ServiceKey)的所有请求。通常,可以将消息处理器封装成插件DLL,以实现功能服务的“热插拔”。
(4)消息处理器
消息处理器IRequestDealer定义如下:
以下为引用的内容:
public interface IRequestDealer
{
byte[] DealRequestMessage(RoundedRequestMsg reqMsg ) ;//同步回复
event CbackRequestRecieved RequestRecieved ;
}
public delegate void CbackRequestRecieved(RoundedRequestMsg roundedMsg) ;
/// <summary>
/// RoundedRequestMsg 对应于一条完整的请求
/// </summary>
public struct RoundedRequestMsg
{
public int ConnectID ; //请求所对应的Tcp连接
public byte[] Data ;
}
RoundedRequestMsg.Data是经消息分裂器分裂得到的一个完整的请求消息,一个字节不多、一个字节也不少。
(5)ITcpStreamDispatcherHook
ITcpStreamDispatcherHook是一个Hook,它为用户提供了一个自定义的对请求/回复消息进行操作的插入点。ITcpStreamDispatcherHook由TcpStreamDispatcher使用,用于对请求消息和回复消息进行截获,然后处理或转换这些消息,比如常用的处理/转换操作包括:加密/解密、消息验证等等。ITcpStreamDispatcherHook定义如下:
以下为引用的内容:
/// <summary>
/// ITcpStreamDispatcherHook 由TcpStreamDispatcher使用,用于对请求消息和回复消息进行截获,然后处理转换这些消息,
/// 比如加密/解密。
/// </summary>
public interface ITcpStreamDispatcherHook
{
//转换消息
byte[] CaptureRequestMsg(byte[] roundedMsg) ;
byte[] CaptureRespondMsg(byte[] roundedMsg) ;
//验证消息,以下验证的消息是还没有被捕获的消息
bool VerifyFirstMsgOfUser(byte[] roundedMsg ,ref RequestValidation validation) ;
bool VerifyOtherMessage(byte[] roundedMsg ,ref RequestValidation validation) ;
}
关于这个接口中各方法的含义可以在消息分派器的实现中更好的领会!
3.消息分派器实现
在前述的基本元素的基础上,实现消息分派器非常简单,我们来看其核心方法DealRequestMessage的实现源码:
以下为引用的内容:
private IMessageSplitter curMsgSplitter = new MessageSpliter() ;
private IDataStreamHelper curMsgHelper ; //必须设置
private IRequestDealerFactory curDealerFactory ; //必须设置
private ITcpStreamDispatcherHook tcpStreamDispatcherHook ;
public ArrayList DealRequestMessage(RequestData requestData, out byte[] leftData, ref RequestValidation validation)
{
//消息分裂
ArrayList respondList = new ArrayList() ;
ArrayList reqList = this.curMsgSplitter.SplitRequestMsgs(requestData.Buff ,requestData.ValidCount ,out leftData) ;
if(reqList == null)
{
return respondList ;
}
bool verified = true ;
for(int i=0; i<reqList.Count ;i++)
{
byte[] theData = (byte[])reqList ;
#region 验证消息
if(requestData.IsFirstMsg && (i == 0))
{
verified = this.tcpStreamDispatcherHook.VerifyFirstMsgOfUser(theData ,ref validation) ;
}
else
{
verified = this.tcpStreamDispatcherHook.VerifyOtherMessage(theData ,ref validation ) ;
}
if(! verified)
{
if(validation.gotoCloseConnection)
{
return null ;
}
this.AddRespondToList(respondList ,this.curMsgHelper.GetRespondWhenFailure(theData ,ServiceFailureType.InvalidMessge)) ;
continue ;
}
#endregion
//接插,捕获/转换请求消息
byte[] reqData = this.tcpStreamDispatcherHook.CaptureRequestMsg(theData) ;
#region 处理消息
//处理消息
IDataStreamHeader header = this.curMsgHelper.ParseMessageHeader(reqData ,0);
IRequestDealer dealer = this.curDealerFactory.CreateDealer(header.ServiceKey ,header.TypeKey) ;
if(dealer == null)
{
this.AddRespondToList(respondList ,this.curMsgHelper.GetRespondWhenFailure(reqData ,ServiceFailureType.ServiceIsNotExit)) ;
continue ;
}
RoundedRequestMsg roundReqMsg = new RoundedRequestMsg();
roundReqMsg.ConnectID = requestData.ConnectID ;
roundReqMsg.Data = reqData ;
try
{
byte[] respondData = dealer.DealRequestMessage(roundReqMsg) ;
if(respondData != null)
{
this.AddRespondToList(respondList ,respondData) ;
}
}
catch(Exception ee)
{
this.AddRespondToList(respondList , this.curMsgHelper.GetRespondWhenFailure(reqData ,ee.Message)) ;
}
#endregion
}
return respondList;
}
//将回复消息加密后放入list
private void AddRespondToList(ArrayList list ,byte[] theRespondData)
{
//接插,捕获/转换回复消息
byte[] respondData = this.tcpStreamDispatcherHook.CaptureRespondMsg(theRespondData) ;
list.Add(respondData) ;
} |
|