不了解ET的,点击 (https://github.com/egametang/ET)
在ET的Demo中,只有鼠标点击地图让角色移动到点击点的一个帧同步消息,我们就从这个消息的发起到其他客户端接收到这个消息的流程来走一遍。在进入Map后,客户端Scene上会添加一个OperaComponent:Game.Scene.AddComponent<OperaComponent>(); 这个Component会监测鼠标点击,然后将点击消息发送给服务器:
public void Update()
{
if (Input.GetMouseButtonDown(1))
{
Ray ray = Camera.main.ScreenPointToRay(Input.mousePosition);
RaycastHit hit;
if (Physics.Raycast(ray, out hit, 1000, this.mapMask))
{
this.ClickPoint = hit.point;
SessionComponent.Instance.Session.Send(new Frame_ClickMap() { X = (int)(this.ClickPoint.x * 1000), Z = (int)(this.ClickPoint.z * 1000) });
}
}
}
Frame_ClickMap的继承关系是这样的:
Frame_ClickMap的消息在Session中被序列化为byte[],前面加了2个byte的Opcode,即Opcode.Frame_ClickMap。然后经过Channel又在前面加了2个byte的size。
消息发送到GateServer后,被这个Client对应的Session接收到,在RunDecompressedBytes中直接有network的MessageDispatcher进行分发。GateServer上client的Session是属于NetOuterComponent的,其MessageDispatcher是OuterMessageDispatcher,因为Frame_ClickMap是一个AActorMessage,所以GateServer直接转发给MapServer:
public class OuterMessageDispatcher: IMessageDispatcher
{
public async void Dispatch(Session session, Opcode opcode, int offset, byte[] messageBytes, AMessage message)
{
// gate session收到actor消息直接转发给actor自己去处理
if (message is AActorMessage)
{
long unitId = session.GetComponent<SessionPlayerComponent>().Player.UnitId;
ActorProxy actorProxy = Game.Scene.GetComponent<ActorProxyComponent>().Get(unitId);
actorProxy.Send(message);
return;
}
……
}
}
需要注意的是,ActorProxy中会对消息进行包装,通过ActorProxy.Send发送的消息会被包装成一个ActorRequest,通过ActorProxy.Call发送的request则会包装成ActorRpcRequest,这两个消息会带有一个Id,用来标记时哪个Actor,然后发送给MapServer:
public class ActorMessageTask : ActorTask
{
public ActorMessageTask(ActorProxy proxy, AMessage message)
{
this.proxy = proxy;
this.message = message;
}
public override async Task<AResponse> Run()
{
ActorRequest request = new ActorRequest(){ Id = this.proxy.Id, AMessage = this.message };
ActorResponse response = await this.proxy.RealCall<ActorResponse>(request, this.proxy.CancellationTokenSource.Token);
return response;
}
……
}
public class ActorRpcTask<Response> : ActorTask where Response : AResponse
{
[BsonIgnore]
public readonly TaskCompletionSource<Response> Tcs = new TaskCompletionSource<Response>();
public ActorRpcTask(ActorProxy proxy, ARequest message)
{
this.proxy = proxy;
this.message = message;
}
public override async Task<AResponse> Run()
{
ActorRpcRequest request = new ActorRpcRequest(){ Id = this.proxy.Id, AMessage = this.message };
ActorRpcResponse response = await this.proxy.RealCall<ActorRpcResponse>(request, this.proxy.CancellationTokenSource.Token);
if (response.Error != ErrorCode.ERR_NotFoundActor)
{
this.Tcs.SetResult((Response)response.AMessage);
}
return response;
}
……
}
public sealed class ActorProxy : Disposer
{
……
public void Send(AMessage message)
{
ActorMessageTask task = new ActorMessageTask(this, message);
this.Add(task);
}
public Task<Response> Call<Response>(ARequest request)where Response : AResponse
{
ActorRpcTask<Response> task = new ActorRpcTask<Response>(this, request);
this.Add(task);
return task.Tcs.Task;
}
public async Task<Response> RealCall<Response>(ActorRequest request, CancellationToken cancellationToken) where Response: AResponse
{
……
//Log.Debug($"realcall {MongoHelper.ToJson(request)} {this.Address}");
request.Id = this.Id;
Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
Response response = (Response)await session.Call(request, cancellationToken);
return response;
……
}
}
同GateServer收到客户端消息一样,MapServer上与这个GateServer连接的Session会收到这个消息,同样转给MessageDispatcher处理,而MapServer的network是NetInnerComponent,对应的是InnerMessageDispatcher,Frame_ClickMap消息被包装在ActorRequest消息中:
public class InnerMessageDispatcher : IMessageDispatcher
{
public void Dispatch(Session session, Opcode opcode, int offset, byte[] messageBytes, AMessage message)
{
……
// 收到actor消息分发给actor自己去处理
if (message is ActorRequest actorRequest)
{
Entity entity = Game.Scene.GetComponent<ActorManagerComponent>().Get(actorRequest.Id);
if (entity == null)
{
Log.Warning($"not found actor: {actorRequest.Id}");
ActorResponse response = new ActorResponse
{
RpcId = actorRequest.RpcId,
Error = ErrorCode.ERR_NotFoundActor
};
session.Reply(response);
return;
}
entity.GetComponent<ActorComponent>().Add(new ActorMessageInfo(){ Session = session, Message = actorRequest });
return;
}
……
}
}
MapServer直接发送给对应unit的ActorComponent了,ActorComponent收到消息后又让entityActorHandler去处理:
private static async void HandleAsync(this ActorComponent self)
{
……
await self.entityActorHandler.Handle(info.Session, self.Parent, info.Message);
……
}
这个entityActorHandler是MapServer上在处理G2M_CreateUnit消息创建unit时为其添加ActorComponent并设置的:
public class G2M_CreateUnitHandler : AMRpcHandler<G2M_CreateUnit, M2G_CreateUnit>
{
protected override async void Run(Session session, G2M_CreateUnit message, Action<M2G_CreateUnit> reply)
{
……
await unit.AddComponent<ActorComponent, IEntityActorHandler>(new MapUnitEntityActorHandler()).AddLocation();
……
}
}
对于帧同步消息(Frame_ClickMap消息是继承于AFrameMessage的),直接放入帧同步消息队列中,注意这里已经把ActorRequest这个壳丢掉了,只把FrameMessage放入了帧同步器中,这个消息在这里设置了unity的Id,之后客户端收到消息后就知道这个消息是针对哪个unit的:
/// <summary>
/// 玩家收到帧同步消息交给帧同步组件处理
/// </summary>
public class MapUnitEntityActorHandler : IEntityActorHandler
{
public async Task Handle(Session session, Entity entity, ActorRequest message)
{
if (message.AMessage is AFrameMessage aFrameMessage)
{
// 客户端发送不需要设置Frame消息的id,在这里统一设置,防止客户端被破解发个假的id过来
aFrameMessage.Id = entity.Id;
Game.Scene.GetComponent<ServerFrameComponent>().Add(aFrameMessage);
ActorResponse response = new ActorResponse
{
RpcId = message.RpcId
};
session.Reply(response);
return;
}
await Game.Scene.GetComponent<ActorMessageDispatherComponent>().Handle(session, entity, message);
}
}
到此玩家点击地图的消息就发送到MapServer了。之后MapServer的帧同步器会定时(一帧)将该帧的消息发送给该MapServer上的所有unit:
public static void Add(this ServerFrameComponent self, AFrameMessage message)
{
self.FrameMessage.Messages.Add(message);
}
public static async void UpdateFrameAsync(this ServerFrameComponent self)
{
TimerComponent timerComponent = Game.Scene.GetComponent<TimerComponent>();
while (true)
{
if (self.Id == 0)
{
return;
}
await timerComponent.WaitAsync(40);
MessageHelper.Broadcast(self.FrameMessage);
++self.Frame;
self.FrameMessage = new FrameMessage(){ Frame = self.Frame };
}
}
跟GateServer上类似,MapServer上会用一个ActorProxy来表示一个unit对应GateServer上的Player,通过这个ActorProxy就可以把消息发送给GateServer:
public static class MessageHelper
{
public static void Broadcast(AActorMessage message)
{
Unit[] units = Game.Scene.GetComponent<UnitComponent>().GetAll();
ActorProxyComponent actorProxyComponent = Game.Scene.GetComponent<ActorProxyComponent>();
foreach(Unit unit in units)
{
// 用gateSessionId在Location服务器上查找到这个unit对应Player的Gate服务器地址,然后把消息发送给Gate服务器
long gateSessionId = unit.GetComponent<UnitGateComponent>().GateSessionId;
actorProxyComponent.Get(gateSessionId).Send(message);
}
}
}
同样的这个消息会被包装成ActorRequest消息,然后在GateServer上与该MapServer连接的Session中处理该消息,同样发送给network的MessageDispatcher处理,但与客户端发送过来的消息不同,这是由MapServer通过InnerAddress发送过来的,所以是交给InnerMessageDispatcher来处理,同上,这个消息交给了ActorComponent,ActorComponent又让entityActorHandler去处理,GateServer上Session的ActorComponent上设置的是GateSessionEntityActorHandler:
public class C2G_LoginGateHandler : AMRpcHandler<C2G_LoginGate, G2C_LoginGate>
{
protected override async void Run(Session session, C2G_LoginGate message, Action<G2C_LoginGate> reply)
{
……
await session.AddComponent<ActorComponent, IEntityActorHandler>(new GateSessionEntityActorHandler()).AddLocation();
……
}
}
/// <summary>
/// gate session收到的消息直接转发给客户端
/// </summary>
public class GateSessionEntityActorHandler : IEntityActorHandler
{
public async Task Handle(Session session, Entity entity, ActorRequest message)
{
ActorResponse response = new ActorResponse{ RpcId = message.RpcId };
try
{
// 注意这里传入的Session是GateServer与MapServer通信的Session,显然我们不能用这个Session来发送消息
// 给客户端。GateServer上ActorComponent是挂在它与客户端通信的Session上的,就是这里的entity。
((Session)entity).Send(message.AMessage);
session.Reply(response);
await Task.CompletedTask;
}
catch (Exception e)
{
response.Error = ErrorCode.ERR_SessionActorError;
response.Message = $"session actor error {e}";
session.Reply(response);
throw;
}
}
}
到这里,该帧的所有帧同步消息就从MapServer发起,经过GateServer,发送给所有在那个MapServer上战斗的客户端了。客户端收到这个消息后进行处理,消息还是由Session接收到并交给network的MessageDispatcher。客户端的NetOuterComponent上设置的是ClientDispatcher:
// 客户端的NetOuterComponent定义
public class NetOuterComponent : NetworkComponent
{
public void Awake()
{
this.Awake(NetworkProtocol.TCP);
this.MessagePacker = new ProtobufPacker();
this.MessageDispatcher = new ClientDispatcher();
}
}
public class ClientDispatcher : IMessageDispatcher
{
public void Dispatch(Session session, Opcode opcode, int offset, byte[] messageBytes, AMessage message)
{
// 如果是帧同步消息,交给ClientFrameComponent处理
FrameMessage frameMessage = message as FrameMessage;
if (frameMessage != null)
{
Game.Scene.GetComponent<ClientFrameComponent>().Add(session, frameMessage);
return;
}
……
}
}
发现这个消息是FrameMessage,直接交给帧同步组件去处理,帧同步组件会以跟服务器同频率处理帧事件,如果发现有帧积累超过4帧,就加速进行追帧:
public class ClientFrameComponent : Component
{
……
public async void UpdateAsync()
{
TimerComponent timerComponent = Game.Scene.GetComponent<TimerComponent>();
while (true)
{
// 如果队列中消息多于4个,则加速跑帧
this.waitTime = maxWaitTime;
if (this.Queue.Count > 4)
{
this.waitTime = maxWaitTime - (this.Queue.Count - 4) * 2;
}
// 最快加速一倍
if (this.waitTime < 20)
{
this.waitTime = 20;
}
await timerComponent.WaitAsync(waitTime);
if (this.Id == 0)
{
return;
}
// 处理服务器发送的一帧的所有帧同步消息
this.UpdateFrame();
}
}
private void UpdateFrame()
{
……
for (int i = 0; i < sessionFrameMessage.FrameMessage.Messages.Count; ++i)
{
AFrameMessage message = sessionFrameMessage.FrameMessage.Messages[i];
Opcode opcode = Game.Scene.GetComponent<OpcodeTypeComponent>().GetOpcode(message.GetType());
// 把每个消息交给MessageDispatherComponent处理
Game.Scene.GetComponent<MessageDispatherComponent>().Handle(sessionFrameMessage.Session,
new MessageInfo(){ Opcode = opcode, Message = message });
}
}
}
MessageDispatherComponent通过注册的Opcode找到Frame_ClickMap对应的消息处理Handler:
[MessageHandler((int)Opcode.Frame_ClickMap)]
public class Frame_ClickMapHandler : AMHandler<Frame_ClickMap>
{
protected override void Run(Session session, Frame_ClickMap message)
{
Unit unit = Game.Scene.GetComponent<UnitComponent>().Get(message.Id);
MoveComponent moveComponent = unit.GetComponent<MoveComponent>();
Vector3 dest = new Vector3(message.X / 1000f, 0, message.Z / 1000f);
moveComponent.MoveToDest(dest, 1);
moveComponent.Turn2D(dest - unit.Position);
}
}
首先找到这个消息是针对哪个unit的,然后设置其MoveComponent中的终点和方向信息,由MoveComponent驱动每帧unit位置的更新。
OK,到这里Frame_ClickMap消息就完成了由一个客户端触发,发送给Gate服务器,再转发给Map服务器,Map服务器添加到帧同步消息队列,每个同步帧广播该帧的所有帧同步消息到每个unit所在Gate服务器,再发送给对应客户端,客户端收到后处理的流程。如果对于消息流程不熟悉的话,可以参考另一篇文章 "ET消息流程"(https://www.jianshu.com/p/f2ecf148bc2f)。