firefly(暗黑世界版v1.4)教程(二) 节点

第一章我们讲了程序的基本启动流程,这里面涉及到了各种节点信息,master,net,gate,game...,当然,无论他叫什么名字,归根结底他都是一个节点。就好比爷爷爸爸儿子,就像是父父节点,父节点,子节点,不管在家里地位如何,他终究是人。那么是如何实现这个分布式节点的呢,我们这章进入firefly源码内一探究竟
节点相关的文件都存放在distributed,也就是分布式模块当中
  • distributed模块说明

该模块主要封装了关于节点的所有方法以及类

  • distributed模块结构解析
ade7e1523ddb8050bee21f8f937a6727.jpg

PBRoot,root节点对象(所有拥有child节点的都是root节点,root节点本身可以有root节点,好比爷爷是root节点,爸爸是root节点,因为他们都有儿子,但是儿子就只能是child节点)

ChildsManager,子节点管理基类
Child 对象对应的是连接到本服务进程的某个服务进程对象。称为子节点对象

RemoteObject远程调用对象,所有child对象都是一个远程调用对象,远程调用对象封装了一个child对象和child对象的远程代理通道(用于反向调用),以及service对象(用于提供远程调用的方法,在下面你会看到很多他的身影,你可先不用管他是什么,只需要知道他能提供被调用的方法就可以了,因为在下一章,我们会详细描述service)

  • 分布式模块的文件结构如下

模块结构如下:

文件结构:
-distributed
    -__init__.py
    -child.py
    -manager.py
    -node.py
    -reference.py
    -root.py
  • - _ init _.py
#该文件本身并没有什么意义,存在的作用是将distributed文件变成一个可以导入的包
  • -child.py
# 没有子节点的节点统称为child节点,这类节点一般用于提供各类服务
'''
Created on 2013-8-14

@author: lan (www.9miao.com)
'''
class Child(object):
    '''子节点对象'''
    
    def __init__(self,cid,name):
        '''初始化子节点对象
        '''
        self._id = cid
        self._name = name
        self._transport = None
        
    def getName(self):
        '''获取子节点的名称'''
        return self._name
        
    def setTransport(self,transport):
        '''设置子节点的通道'''
        self._transport = transport
        
    def callbackChild(self,*args,**kw):
        '''回调子节点的接口
        return a Defered Object (recvdata)
        '''
        recvdata = self._transport.callRemote('callChild',*args,**kw)
        return recvdata
  • manager.py
# manager 顾名思义,是一个管理类
# 在分布式节点中,既然有子节点,那么就会有父节点。
# 而有了根节点,就需要赋予父节点管理子节点的功能
# manager 的出现就是为了这个功能 
'''
Created on 2013-8-14

@author: lan (www.9miao.com)
'''
from twisted.python import log
from zope.interface import Interface
from zope.interface import implements

class _ChildsManager(Interface): #接口类(这里用到接口这种开发模式,有兴趣的可以看一下)
    '''节点管理器接口'''
    
    def __init__(self):
        '''初始化接口'''
        
    def getChildById(self,childId):
        '''根据节点id获取节点实例'''
        
    def getChildByName(self,childname):
        '''根据节点的名称获取节点实例'''
        
    def addChild(self,child):
        '''添加一个child节点
        @param child: Child object
        '''
    
    def dropChild(self,*arg,**kw):
        '''删除一个节点'''
        
    def callChild(self,*args,**kw):
        '''调用子节点的接口'''
        
    def callChildByName(self,*args,**kw):
        '''调用子节点的接口
        @param childname: str 子节点的名称
        '''
    
    def dropChildByID(self,childId):
        '''删除一个child 节点
        @param childId: Child ID 
        '''
        
    def dropChildSessionId(self, session_id):
        """根据session_id删除child节点
        """

@implementer(_ChildsManager) 
class ChildsManager(object):
'''
子节点管理器
因为装饰器implementer
用户必须得完成_ChildsManager的所有方法
'''    
    
    def __init__(self):
        '''
          初始化子节点管理器
        '''
        self._childs = {} 
        #第一步初始化子节点字典
        #所有子节点对象都将以键值对的形式存储在这里
        #key -> child的ID
        #value -> child 对象,也就是子节点

    def getChildById(self,childId):
        '''
          根据节点的ID获取节点实例
        '''
        return self._childs.get(childId)
    
    def getChildByName(self,childname):
        '''
          根据节点的名称获取节点实例
          遍历子节点
          从中提取节点名称为childname的节点
        '''
        for key,child in self._childs.items():
            if child.getName() == childname:
                return self._childs[key]
        return None
        
    def addChild(self,child):
        '''添加一个child节点
        @param child: Child object
        '''
        key = child._id
        if self._childs.has_key(key):
            raise "child node %s exists"% key
        self._childs[key] = child
        
    def dropChild(self,child):
        '''删除一个child 节点
        @param child: Child Object 
        '''
        key = child._id
        try:
            del self._childs[key]
        except Exception,e:
            log.msg(str(e))
            
    def dropChildByID(self,childId):
        '''通过ID删除一个child 节点
        @param childId: Child ID 
        '''
        try:
            del self._childs[childId]
        except Exception,e:
            log.msg(str(e))
            
    def callChild(self,childId,*args,**kw):
        '''调用子节点的接口
        @param childId: int 子节点的id
        '''
        child = self._childs.get(childId,None)
        if not child:
            log.err("child %s doesn't exists"%childId)
            return
        return child.callbackChild(*args,**kw)
    
    def callChildByName(self,childname,*args,**kw):
        '''通过节点名称调用子节点的接口
        @param childname: str 子节点的名称
        '''
        child = self.getChildByName(childname)
        if not child:
            log.err("child %s doesn't exists"%childname)
            return
        return child.callbackChild(*args,**kw)
    
    def getChildBYSessionId(self, session_id):
        """根据sessionID获取child节点信息
        """
        for child in self._childs.values():
            if child._transport.broker.transport.sessionno == session_id:
                return child
        return None

从代码中,我们基本上可以看出来这个类除了用于管理子节点的添加,移除,获取这种最基本的以外,还有callChild**,callChild函数首先取出子节点,然后再调用子节点的callbackChild函数。从字面意思上,我们基本就可以看出来,这是一个调用子节点函数的函数。至于怎么实现的,可以去看twisted的透明代理部分,官网都有详细的说明以及demo。

  • node.py

介绍了那么多关于子节点的内容,
下面我们正式开始揭开子节点的面纱。

#顾名思义,node.py就是根节点下面的子节点

'''
Created on 2013-8-14

@author: lan (www.9miao.com)
'''
from twisted.spread import pb
from twisted.internet import reactor
reactor = reactor
from reference import ProxyReference

def callRemote(obj,funcName,*args,**kw):
    '''
    远程调用(这里只是作为一个调用子节点的方法存在)
    @param obj 子节点对象,也就是RemoteObject
    @param funcName: str 远程方法名称
    '''
    return obj.callRemote(funcName, *args,**kw)
    
    
class RemoteObject(object):
    '''远程调用对象
      也就是真正的子节点'''
    
    def __init__(self,name):
        '''初始化远程调用对象
        @param port: int 远程分布服的端口号
        @param rootaddr: 根节点服务器地址
        '''
        self._name = name #本节点的节点名称
        self._factory = pb.PBClientFactory() #twisted的透明代理客户端工厂
        self._reference = ProxyReference() #twisted透明代理的代理通道
        self._addr = None #root节点的节点地址(host,port)
        
    def setName(self,name):
        '''设置节点的名称'''
        self._name = name
        
    def getName(self):
        '''获取节点的名称'''
        return self._name
        
    def connect(self,addr):
        '''初始化远程调用对象'''
        self._addr = addr # 保存远程节点地址
        reactor.connectTCP(addr[0], addr[1], self._factory) # 连接远程节点地址
        self.takeProxy() # 获取远程节点地址对象(这是一个异步操作,返回的是延时对象)
        
    def reconnect(self):
        '''重新连接'''
        self.connect(self._addr)
        
    def addServiceChannel(self,service):
        '''
        设置引用对象
        '''
        self._reference.addService(service)
        
    def takeProxy(self):
        '''
        向远程服务端发送代理通道对象
        twisted的双向透明代理规定的步骤
        1.向服务器请求root对象
        2.通过root对象向服务器发送自身的ProxyReference(代理通道)
        3.双向连接完成
        '''
        deferedRemote = self._factory.getRootObject()
        deferedRemote.addCallback(callRemote,'takeProxy',self._name,self._reference)
    
    def callRemote(self,commandId,*args,**kw):
        '''
        这里就是正式的远程调用函数了,子节点调用父节点的方法
        和建立双向透明代理类似,调用的步骤为:
        1.获取远程root对象
        2.通过root对象调用服务器函数
        @param callRemote 这里调用的就是该文件顶部的那个同名函数
        由于 addCallback会将 异步获取的 对象传入Callback 中
        也就是传入 callRemote 这个函数内,所以最终实现的效果是 :
        self._factory.getRootObject().callRemote('callTarget', ,commandId,*args,**kw)
        @param callTarget  这是firefly定义的远程调用函数函数名称
        @param commandId 远程对象会根据 commandId 来最终绝对执行什么操作
        '''
        deferedRemote = self._factory.getRootObject()
        return deferedRemote.addCallback(callRemote,'callTarget',commandId,*args,**kw)

  • reference.py(代理通道)
#coding:utf8
'''
Created on 2013-8-14

@author: lan (www.9miao.com)
'''
from twisted.spread import pb
from firefly.utils.services import Service


class ProxyReference(pb.Referenceable):
    '''代理通道'''
    
    def __init__(self):
        '''初始化'''
        self._service = Service('proxy')
        
    def addService(self,service):
        '''添加一条服务通道'''
        self._service = service
    
    def remote_callChild(self, command,*arg,**kw):
        '''代理发送数据
        '''
        return self._service.callTarget(command,*arg,**kw)

这一部分是代理通道相关的代码,主要实现功能就是在子节点实例化的时候产生一个代理通道,然后在子节点连接上父节点之后通过调用takeProxy函数将自己的代理通道对象发送给父节点,代理成功后,父节点就可以调用子节点中约定的函数(这也被称为pb的反向调用)

  • 父节点对象 -root.py

最后我们来看看分布式节点的中心root节点,当然这里的所谓中心并不代表只有一个root节点,比如暗黑世界里,直观的你就能发现master和gate都是root节点,对于master而言,gate又是他的子节点。同理,game同样可以作为一个root节点(当然在暗黑世界里,game是一个child节点),然后分裂出game1,game2,game3...,但同时,又是gate的子节点,这其中并不矛盾。

# root.py文件,顾名思义,是个根节点

'''
Created on 2013-8-14
分布式根节点
@author: lan (www.9miao.com)
'''
from twisted.python import log
from twisted.spread import pb
from manager import ChildsManager
from child import Child

class BilateralBroker(pb.Broker):
    '''
    
    '''
    def connectionLost(self, reason):
        '''
        这里重写了pb.broker的连接丢失方法,用于子连接丢失后将子节点从根节点中移除
        '''
        clientID = self.transport.sessionno #获取断开的节点id
        log.msg("node [%d] lose"%clientID)
        self.factory.root.dropChildSessionId(clientID)# 移除断开的节点对象
        pb.Broker.connectionLost(self, reason) #断开节点

class BilateralFactory(pb.PBServerFactory):
    '''
    重写 PBServerFactory 类,
    目的只是为了重写pb.Broker的connectionLost方法,
    而重写connectionLost方法只是为了将子节点断开后,
    从根节点中将对象删除,保持根节点对于子节点信息存储的准确性
    '''    
    protocol = BilateralBroker

    
class PBRoot(pb.Root):
    '''
    继承PB 协议的root节点
    '''
    
    def __init__(self,dnsmanager = ChildsManager()):
        '''
       初始化根节点
        '''
        self.service = None 
        self.childsmanager = dnsmanager # 创建子节点管理器
    
    def addServiceChannel(self,service):
        '''添加服务通道
        @param service: Service Object(In bilateral.services)
        '''
        self.service = service # 
        
    def doChildConnect(self,name,transport):
        """当node节点连接时的处理
        """
        pass
    
    def dropChild(self,*args,**kw):
        '''删除子节点记录'''
        self.childsmanager.dropChild(*args,**kw)
        
    def dropChildByID(self,childId):
        '''删除子节点记录'''
        self.doChildLostConnect(childId)
        self.childsmanager.dropChildByID(childId)
        
    def dropChildSessionId(self, session_id):
        '''删除子节点记录'''
        child = self.childsmanager.getChildBYSessionId(session_id)
        if not child:
            return
        child_id = child._id
        self.doChildLostConnect(child_id)
        self.childsmanager.dropChildByID(child_id)
        
    def doChildLostConnect(self,childId):
        """当node节点连接时的处理
        """
        pass
    
    def callChild(self,key,*args,**kw):
        '''调用子节点的接口
        @param childId: int 子节点的id
        return Defered Object
        '''
        return self.childsmanager.callChild(key,*args,**kw)
    
    def callChildByName(self,childname,*args,**kw):
        '''调用子节点的接口
        @param childId: int 子节点的id
        return Defered Object
        '''
        return self.childsmanager.callChildByName(childname,*args,**kw)
    
    # 这里remote_**格式的方法,是twisted透明代理方法的格式,以此命名的方法都可被远程调用
    def remote_takeProxy(self,name,transport):
        '''设置代理通道
        @param addr: (hostname,port)hostname 根节点的主机名,根节点的端口
        '''
        log.msg('node [%s] takeProxy ready'%name)
        child = Child(name,name) # 创建子节点
        self.childsmanager.addChild(child) # 添加子节点
        child.setTransport(transport)  # 设置子节点连接对象
        self.doChildConnect(name, transport) #连接子节点 

    def remote_callTarget(self,command,*args,**kw):
        '''远程调用方法
        @param commandId: int 指令号
        @param data: str 调用参数
        '''
        data = self.service.callTarget(command,*args,**kw)
        return data
  • 最终,我们基本已经清楚分布式布局的核心,互相调用的原理
1. root节点服务会首先建立,监听例如1000端口 root
2. node节点则会在服务器来后第一时间去连接1000端口 
  {node->connect()->root}
3. node与root建立连接成功后,node节点会调用root节点的remote_takeProxy函数用以在root节点互相交换代理权
  {node->takeProxy()->callRemote('register')->childsmanager.addChildByNamePeer()}
4. 至此root和node之间的连接已经建立成功了,当然建立连接的本质意义是为了实现相互调用,那么相互调用又是怎么实现呢?
5. root调用child(这是一个反向调用的过程)
  {root -> childsmanager.callChild() -> child.callbackChild() -> ProxyReference().callRemote()} 
6. child调用root(这是一个正向调用的过程)
  {child -> callRemote()} 
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容