大型mmo服务器架构介绍----线程篇

本篇是打算介绍一下目前常见的大型mmo服务器架构的源码,其实目前见过的几个框架在思想上模型基本上大同小异,本人公司的代码由于不方便展示,于是使用开源的框架进行解析,主要理解大致思想,抓重点

有没有想过一个问题?
服务器在启动的一刻,客户端和服务器究竟做了哪些准备工作?以及玩家登陆的时候,客户端又是怎么样和服务器通讯的呢?
本篇也是系列的第一篇,打算先从底层线程的设计开始讲起
该框架将每个任务进行封装,然后通过线程管理器分别分发到对应的线程处理。
也是mmo服务器的基础线程框架。

ThreadObject类

ThreadObject类其实是线程处理事件的一个最小单位
里面包含了需要处理的函数(通过std::function包装)

class Thread;
class ThreadObject : public MessageList
{
public:
    virtual bool Init() = 0; //初始化函数
    virtual void RegisterMsgFunction() = 0;//消息注册
    virtual void Update() = 0;    //更新

    void SetThread(Thread* pThread);
    Thread* GetThread() const;
    bool IsActive() const;
    void Dispose() override;
    
protected:
    bool _active{ true };
    Thread* _pThread{ nullptr };

该类很简单,继承于MessageList(这个后续说)
并且主要成员就两个,_active是否已经执行过,_pThread为当前执行这个线程对象的线程指针(就是这玩意是在哪个线程上执行的)

如何使用这个类呢,只需要继承这个类,然后将这个类的 虚函数实现就行
具体用法后面再结合一起说

Thread类

介绍完object类自然就是介绍Thread类啦,顾名思义 ThreadObject对象就是在某个Thread对象上运行的

class Packet;
class ThreadObject;

class ThreadObjectList: public IDisposable
{
public:
    void AddObject(ThreadObject* _obj);
    void Update();
    void AddPacketToList(Packet* pPacket);
    void Dispose() override;

protected:
    // 本线程的所有对象    
    std::list<ThreadObject*> _objlist;
    std::mutex _obj_lock;
};

class Thread : public ThreadObjectList, public SnObject {
public:
    Thread();
    void Start();
    void Stop();
    bool IsRun() const;
   
private:
    bool _isRun;
    std::thread _thread;
};

可以看到上面的代码,Thread类继承于ThreadObjectList,而ThreadObjectList里就保存了一个list,里面存放所有ThreadObject对象的指针,并且提供了方法接口,将ThreadObject对象指针存入到list中,而主Thread类保存了当前进程的指针,使用的是C++11后的std::thread库,并且也封装了start和stop方法,方便使用。

void ThreadObjectList::AddObject(ThreadObject* obj)
{
    std::lock_guard<std::mutex> guard(_obj_lock);//对当前线程上锁

    // 在加入之前初始化一下
    if (!obj->Init())
    {
        std::cout << "AddObject Failed. ThreadObject init failed." << std::endl;
    }
    else
    {
        obj->RegisterMsgFunction(); //运行threadObject的消息注册函数
        _objlist.push_back(obj); //将threadobject对象指针保持至list
      
        //保持成功后 将当前线程的指针保存至这个threadObject对象的thread成员上
        const auto pThread = dynamic_cast<Thread*>(this);
        if (pThread != nullptr)
            obj->SetThread(pThread);
    }
}

上面的函数就是将ThreadObject对象指针存入ThreadObjectList::_objlist对象的过程,重要的三点:
1:存入之前先对object初始化
2:注册消息函数
3:成功之后保存当前线程指针

Thread::Thread()
{
    this->_isRun = true;
}

void Thread::Stop()
{
    if (!_isRun)
    {
        _isRun = false;
        if (_thread.joinable()) _thread.join();
    }
}

void Thread::Start()
{
    _isRun = true;
    _thread = std::thread([this]()
    {
        while (_isRun)
        {
            Update();
        }
    });
}

bool Thread::IsRun() const
{
    return _isRun;
}

上述代码就是Thread类的方法实现,非常简单明了,这里注意 thread对象只要在运行,那么就会一直执行ThreadObiectList对象的update函数 这也是当前线程的主循环

那么我们来看看update函数都干了些什么吧~

void ThreadObjectList::Update()
{
    std::list<ThreadObject*> _tmpObjs; //
    _obj_lock.lock();
    std::copy(_objlist.begin(), _objlist.end(), std::back_inserter(_tmpObjs));
    _obj_lock.unlock();

    for (ThreadObject* pTObj : _tmpObjs)
    {
        pTObj->ProcessPacket();
        pTObj->Update();

        // 非激活状态,删除
        if (!pTObj->IsActive())
        {
            _obj_lock.lock();
            _objlist.remove(pTObj);
            _obj_lock.unlock();

            pTObj->Dispose();
            delete pTObj;
        }
    }

    std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

上述代码其实也不难,本质就是把原来的list上的Object拷贝一份,然后对拷贝的这份的每个object指针进行处理
分别对每个object执行
1: processPacket(这个先无视 是MessageList里的方法,理解成处理消息就行)
2:Update(这个函数是虚函数,具体逻辑自己实现,也是我们需要真正执行逻辑的地方)
3:执行完update之后 判断这个object是否已经没用了(被抛弃了) 如果已经没用了,那么就将这个obj移除掉

最终再过一毫秒继续运行该函数

ThreadMgr类

class Packet;
class ThreadObject;
class Network;

class ThreadMgr :public Singleton<ThreadMgr>, public ThreadObjectList
{
public:
    ThreadMgr();
    void StartAllThread();
    bool IsGameLoop();
    void NewThread();
    bool AddObjToThread(ThreadObject* obj);
    void AddNetworkToThread(APP_TYPE appType, Network* pNetwork);

    // message
    void DispatchPacket(Packet* pPacket);
    void SendPacket(Packet* pPacket);

    void Dispose() override;

private:
    Network* GetNetwork(APP_TYPE appType);

private:
    uint64 _lastThreadSn{ 0 }; // 实现线程对象均分

    std::mutex _thread_lock;
    std::map<uint64, Thread*> _threads;

    // NetworkLocator
    std::mutex _locator_lock;
    std::map<APP_TYPE, Network*> _networkLocator;
};

不需要全部看懂意思 只需要大概了解思想
抓重点
1:继承自Singleton<ThreadMgr> 说明每个进程只允许一个对象,不管从哪个线程获取到这个ThreadMgr都是指向同一个对象
2: std::map<uint64, Thread*> _threads; 该对象拥有所有线程对象的指针,并且是以key = 线程id value =线程指针的形式保存
3:可以通过该对象创建新的进程
4:可以通过该对象直接将ThreadObject对象平均分配到线程中,而不需要我们自己决定,实现了负载均衡

接下来看具体方法实现:

void ThreadMgr::StartAllThread()
{
    auto iter = _threads.begin();
    while (iter != _threads.end())
    {
        iter->second->Start();
        ++iter;
    }
}

bool ThreadMgr::IsGameLoop()
{
    for (auto iter = _threads.begin(); iter != _threads.end(); ++iter)
    {
        if (iter->second->IsRun())
            return true;
    }

    return false;
}

void ThreadMgr::NewThread()
{
    std::lock_guard<std::mutex> guard(_thread_lock);
    auto pThread = new Thread();
    _threads.insert(std::make_pair(pThread->GetSN(), pThread));
}

上面代码通俗易懂 不需要解释~

// 平均加入各线程中
bool ThreadMgr::AddObjToThread(ThreadObject* obj)
{
    std::lock_guard<std::mutex> guard(_thread_lock);

    // 找到上一次的线程 
    auto iter = _threads.begin();
    if (_lastThreadSn > 0)
    {
        iter = _threads.find(_lastThreadSn);
    }

    if (iter == _threads.end())
    {
        // 没有找到,可能没有配线程
        std::cout << "AddThreadObj Failed. no thead." << std::endl;
        return false;
    }

    // 取到它的下一个活动线程
    do
    {
        ++iter;
        if (iter == _threads.end())
            iter = _threads.begin();
    } while (!(iter->second->IsRun()));

    auto pThread = iter->second;
    pThread->AddObject(obj);
    _lastThreadSn = pThread->GetSN();
    //std::cout << "add obj to thread.id:" << pThread->GetSN() << std::endl;

    return true;
}

主要来看如何进行负载均衡的
看了代码就能理解,其实就是平均分配
在把obj加入到某个线程之后 记录这个线程的线程id,下次别的ojb来的时候就在这个线程id对应的mapKey的下一位key对应的线程上增加该ojb
假设 线程有 1 2 3
第一次是加到1 上
第二次是加到2 上
第三次是加到3 上
如此反复

那么到此为止,整体的线程框架就能了解了

image.png

MessageList

image.png

上文说道,在ThreadObject上继承了MessageList这个类,那么这个类是啥呢?

先考虑一个问题:
假设一个消息Id为 Msg1,携带着Packet数据,从客户端传来,按照现在的情况是 ThreadMgr管理类会将这个msg分配给每个Thread,并且每个ThreadObject都可以处理这个数据。
但是如果我只希望Thread1里的某个ThreadObject处理这条数据就行了,别的ThreadObject处理别的MsgId,实现消息的过滤,应该怎么办呢?
MessageList的用处就是在这里,实现消息过滤,每个ThreadObject监听自己感兴趣的msgId。
看代码!

class Packet;

typedef std::function<void(Packet*)> HandleFunction;

class MessageList
{
public:
    void RegisterFunction(int msgId, HandleFunction function);//注册这个消息Id 对应的回调
    bool IsFollowMsgId(int msgId);//是否是这个ThreadObjc所需要的消息id
    void ProcessPacket();//处理Packet
    void AddPacket(Packet* pPacket); //添加Patcket

protected:
    std::mutex _msgMutex; //锁
    std::list<Packet*> _msgList; //消息队列
    std::map<int, HandleFunction> _callbackHandle; //回调map 
};

从上面可以大致知道,每个ThreadObject都有一个MessageList,这样每个ThreadObject在声明的时候就可以通过RegisterFunction这个函数来注册自己需要监听的消息。

void MessageList::RegisterFunction(int msgId, HandleFunction function)
{
    std::lock_guard<std::mutex> guard(_msgMutex);
    _callbackHandle[msgId] = function;
}

当消息来到的时候,通过IsFollowMsgId 来判断是不是ThreadObject所需要的msgId,如果是的话,就调用 AddPacket函数,将这个消息的packet保存下来,供后续ProcessPacket函数使用。

void MessageList::AddPacket(Packet* pPacket)
{
    std::lock_guard<std::mutex> guard(_msgMutex);
    _msgList.push_back(pPacket);
}
image.png

上图逻辑就是每个Thread在分发消息的时候 根据每个ThreadObject关心的MsgId进行分发的。

void MessageList::ProcessPacket()
{
    std::list<Packet*> tmpList;
    _msgMutex.lock();
    std::copy(_msgList.begin(), _msgList.end(), std::back_inserter(tmpList));
    _msgList.clear();
    _msgMutex.unlock();

    for (auto packet : tmpList)
    {
        const auto handleIter = _callbackHandle.find(packet->GetMsgId());
        if (handleIter == _callbackHandle.end())
        {
            std::cout << "packet is not hander. msg id;" << packet->GetMsgId() << std::endl;
        }
        else
        {
            handleIter->second(packet);
        }
    }

    tmpList.clear();
}

处理消息的逻辑大致如下
1.声明一个tmpList,存储
2 上锁 把_msgList复制到tmpList里处理 清空_msgList 解锁

  1. 遍历tmpList 获取每个packet的 MsgId,并且通过_callbackHandle map找到对应MsgId需要处理的函数对象,调用该函数。

提问:为什么这里要上锁呢?
其实是因为ThreadMgr是一个单例对象,这个对象在线程1和线程2其实指向的都是同一个对象,那么就会造成共享资源的问题,所以需要对Thread和ThreadObect做访问临界资源上锁的处理。

提问:MsgId是什么
实际上可以理解成 是和服务器和客户端规定的协议 ,比如大家规定当msgId等1时,这个消息代表的含义是XXX。那么双方彼此就可以通过MsgId获取到含义,从而处理这个消息。
目前用的最普遍的消息序列化工具是protobuf

什么是protobuf? 官方文档对 protobuf 的定义:protocol buffers 是一种语言无关、平台无关、可扩展的序列化结构数据的方法,可用于数据通信协议和数据存储等.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,406评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,976评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,302评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,366评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,372评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,457评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,872评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,521评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,717评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,523评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,590评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,299评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,859评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,883评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,127评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,760评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,290评论 2 342

推荐阅读更多精彩内容