hook调用过程源码解析
调用过程主要涉及到:
- callers 模块中的 _multicall 函数
- hook 模块中的 _HookCaller 类的 __call__ 方法
当我们 add_hookspecs hookspec 以及 register hookimpl
通过 PluginManager.hook.myhook(**kwargs) 之后:
-
调用myhook(实际就是一个 _HookCaller 类的一个实例)的 __call__ 方法
__call__ 作用:检查是否是 historic (historic=True时,是需要通过myhook.call_historic方式传入参数,在register时,自动调用),实际传递参数与 hookspec 是否一致 (不一致并不会导致运行停止,而是 raise 一个UserWarning),实际传递参数是否都是键值对。
最终调用 _hookexec 传入相应的参数。
class _HookCaller:
# 构造函数,实例化传入 hook_execute
def __init__(self, name, hook_execute, specmodule_or_class=None, spec_opts=None):
self.name = name
self._wrappers = []
self._nonwrappers = []
self._hookexec = hook_execute
self._call_history = None
self.spec = None
if specmodule_or_class is not None:
assert spec_opts is not None
self.set_specification(specmodule_or_class, spec_opts)
# 调用初始化时传入的 hook_execute 函数
def __call__(self, *args, **kwargs):
if args:
raise TypeError("hook calling supports only keyword arguments")
assert not self.is_historic()
# This is written to avoid expensive operations when not needed.
# 判断实际调用的hook传入的参数是否与hookspec一致,否则rasie UserWarning
if self.spec:
for argname in self.spec.argnames:
if argname not in kwargs:
notincall = tuple(set(self.spec.argnames) - kwargs.keys())
warnings.warn(
"Argument(s) {} which are declared in the hookspec "
"can not be found in this hook call".format(notincall),
stacklevel=2,
)
break
firstresult = self.spec.opts.get("firstresult")
else:
firstresult = False
# __call__ 的作用,检查是否是historic,是否实际参数与hookspec一致,是否传入参数都是键值对,
# 实际上是调用了callers中的_multicall函数,
# self.get_hookimpls() 返回所有hooimpl,hookwrapper 在后,nonwrapper在前
# 等价于 _multicall(self.name, self.get_hookimpls(), kwargs, firstresult)
return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)
-
__call__ 之后,会调用实例化 _HookCaller 时传入的 hook_execute (本质上是一个 callers 模块中的 _multicall 函数)
- _HookCaller 对象是在 register 时实例化的,并且通过 setattr 设置 _HookRelay 对象的一个属性,通过该方式存储所有的 hookimpl ,即 1:N 个
class PluginManager:
def __init__(self, project_name):
self.project_name = project_name
self._name2plugin = {}
self._plugin2hookcallers = {}
self._plugin_distinfo = []
self.trace = _tracing.TagTracer().get("pluginmanage")
self.hook = _HookRelay()
self._inner_hookexec = _multicall
def _hookexec(self, hook_name, methods, kwargs, firstresult):
# called from all hookcaller instances.
# enable_tracing will set its own wrapping function at self._inner_hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
def register(self, plugin, name=None):
""" Register a plugin and return its canonical name or ``None`` if the name
is blocked from registering. Raise a :py:class:`ValueError` if the plugin
is already registered. """
plugin_name = name or self.get_canonical_name(plugin)
if plugin_name in self._name2plugin or plugin in self._plugin2hookcallers:
if self._name2plugin.get(plugin_name, -1) is None:
return # blocked plugin, return None to indicate no registration
raise ValueError(
"Plugin already registered: %s=%s\n%s"
% (plugin_name, plugin, self._name2plugin)
)
# XXX if an error happens we should make sure no state has been
# changed at point of return
self._name2plugin[plugin_name] = plugin
# register matching hook implementations of the plugin
self._plugin2hookcallers[plugin] = hookcallers = []
for name in dir(plugin):
hookimpl_opts = self.parse_hookimpl_opts(plugin, name)
if hookimpl_opts is not None:
normalize_hookimpl_opts(hookimpl_opts)
method = getattr(plugin, name)
hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts)
name = hookimpl_opts.get("specname") or name
hook = getattr(self.hook, name, None)
if hook is None:
# hook = _HookCaller(name, self._hookexec)
# setattr(self.hook, name, hook)
# _HookCaller 对象是在 register 时实例化的,并且通过 setattr 设置 _HookRelay 对象的一个属性
hook = _HookCaller(name, self._hookexec)
setattr(self.hook, name, hook)
elif hook.has_spec():
self._verify_hook(hook, hookimpl)
hook._maybe_apply_history(hookimpl)
hook._add_hookimpl(hookimpl)
hookcallers.append(hook)
return plugin_name
-
_multicall 的作用:
通过 reversed 函数反转 hook_impls 列表 (get_hookimpls 方法返回的列表,hookwrapper 在后,nonwrapper 在前),所以hookwrapper 优先调用,遍历所有 register 的 hookimpl 对象,并调用相应的 function
如果 hookimpl 对象是一个 hookwrapper (即 hookwrapper=True),则调用相应的 function,返回一个生成器,并 next() 迭代一次, 将此时的迭代状态通过一个 teardowns list 维护, 这里并不会获取 hookwrapper 的返回值,也并不会受 firstresult=True 影响
如果 hookimpl 对象是一个 nonwrapper, 调用 function,并将返回值增加到一个 result 列表中,如果 firstresult=True,则 break 遍历
遍历调用结束后,将实例一个 _Result 对象,传入 result,并且反转 teardowns,继续执行 yield 之后的代码,之前先调用的 hookwrapper,现在最后调用。最终返回 _Result.get_result(),即 result 列表
def _multicall(hook_name, hook_impls, caller_kwargs, firstresult):
"""Execute a call into multiple python functions/methods and return the
result(s).
``caller_kwargs`` comes from _HookCaller.__call__().
"""
__tracebackhide__ = True
results = []
excinfo = None
try: # run impl and wrapper setup functions in a loop
teardowns = []
try:
# 通过 reversed 函数反转 hook_impls 列表
# get_hookimpls 方法返回的列表,hookwrapper 在后,nonwrapper 在前
# 所以hookwrapper 优先调用
for hook_impl in reversed(hook_impls):
try:
args = [caller_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in caller_kwargs:
raise HookCallError(
"hook call must provide argument %r" % (argname,)
)
# 如果 hookimpl 对象是一个 hookwrapper (即 hookwrapper=True),
# 则调用相应的 function,返回一个生成器,并 next() 迭代一次,
# 将此时的迭代状态通过一个 teardowns list 维护,
# 这里并不会获取 hookwrapper 的返回值,也并不会受 firstresult=True 影响
# 如果 hookimpl 对象是一个 nonwrapper, 调用 function,
# 并将返回值增加到一个 result 列表中,如果 firstresult=True,则 break 遍历
if hook_impl.hookwrapper:
try:
gen = hook_impl.function(*args)
next(gen) # first yield
teardowns.append(gen)
except StopIteration:
_raise_wrapfail(gen, "did not yield")
else:
res = hook_impl.function(*args)
if res is not None:
results.append(res)
if firstresult: # halt further impl calls
break
except BaseException:
excinfo = sys.exc_info()
# 遍历调用结束后,将实例一个 _Result 对象,传入 result,并且反转 teardowns,继续执行 yield 之后的代码
# 之前先调用的 hookwrapper,现在最后调用。最终返回 _Result.get_result(),即 result 列表
finally:
if firstresult: # first result hooks return a single value
outcome = _Result(results[0] if results else None, excinfo)
else:
outcome = _Result(results, excinfo)
# run all wrapper post-yield blocks
for gen in reversed(teardowns):
try:
gen.send(outcome)
_raise_wrapfail(gen, "has second yield")
except StopIteration:
pass
return outcome.get_result()