async def app(request: Request) -> Response:
......
solved_result = await solve_dependencies(
request=request,
dependant=dependant,
body=body,
dependency_overrides_provider=dependency_overrides_provider,
)
values, errors, background_tasks, sub_response, _ = solved_result
if errors:
raise RequestValidationError(errors, body=body)
else:
raw_response = await run_endpoint_function(
dependant=dependant, values=values, is_coroutine=is_coroutine
)
......
这里是endpoint的前一步,request首先要经过solve_dependencies()
来与endpoint的依赖树进行匹配,这相当于API的门卫一般的存在。
匹配的结果包含在solve过程中的结果,错误,以及其他若干信息。有了详细的信息,我们便能精确的定位到问题的所在。
async def solve_dependencies(
*,
request: Union[Request, WebSocket],
dependant: Dependant,
body: Optional[Union[Dict[str, Any], FormData]] = None,
background_tasks: Optional[BackgroundTasks] = None,
response: Optional[Response] = None,
dependency_overrides_provider: Optional[Any] = None,
dependency_cache: Optional[Dict[Tuple[Callable, Tuple[str]], Any]] = None,
) -> Tuple[
Dict[str, Any],
List[ErrorWrapper],
Optional[BackgroundTasks],
Response,
Dict[Tuple[Callable, Tuple[str]], Any],
]:
"""
:param request: 请求报文
:param dependant: endpoint对应的依赖树
:param body: 请求体
:param background_tasks: 后台任务
:param response: 子依赖
:param dependency_overrides_provider: app中设置的依赖替代项
:param dependency_cache: 已完成的依赖
:return:
"""
values: Dict[str, Any] = {}
errors: List[ErrorWrapper] = []
response = response or Response(
content=None,
status_code=None, # type: ignore
headers=None,
media_type=None,
background=None,
)
dependency_cache = dependency_cache or {}
开始解依赖树
for sub_dependant in dependant.dependencies:
sub_dependant.call = cast(Callable, sub_dependant.call)
sub_dependant.cache_key = cast(
Tuple[Callable, Tuple[str]], sub_dependant.cache_key
)
# cast的作用是标注类型,方便提示
call = sub_dependant.call
use_sub_dependant = sub_dependant
# 分别拿到依赖内容和依赖项
if (
dependency_overrides_provider
and dependency_overrides_provider.dependency_overrides
):
# 依赖重写时
original_call = sub_dependant.call
call = getattr(
dependency_overrides_provider, "dependency_overrides", {}
).get(original_call, original_call)
# 找到对应的重写
use_path: str = sub_dependant.path # type: ignore
use_sub_dependant = get_dependant(
path=use_path,
call=call,
name=sub_dependant.name,
security_scopes=sub_dependant.security_scopes,
)
# 重新生成依赖
use_sub_dependant.security_scopes = sub_dependant.security_scopes
solved_result = await solve_dependencies(
request=request,
dependant=use_sub_dependant,
body=body,
background_tasks=background_tasks,
response=response,
dependency_overrides_provider=dependency_overrides_provider,
dependency_cache=dependency_cache,
)
# 获得依赖项的子依赖的结果集
(
sub_values,
sub_errors,
background_tasks,
_, # 子依赖项返回与我们相同的响应
sub_dependency_cache,
) = solved_result
# 拿到结果和错误
上面这部分负责解决子依赖,拿到最后的结果集,然后用子依赖的结果集来解决自身
dependency_cache.update(sub_dependency_cache)
# 将子依赖已解决的内容注册
if sub_errors:
errors.extend(sub_errors)
continue
if sub_dependant.use_cache and sub_dependant.cache_key in dependency_cache:
# 如果设置了使用缓存,且该依赖内容已被子依赖解决过
solved = dependency_cache[sub_dependant.cache_key]
# 直接抄答案
# 否则照常执行
elif is_gen_callable(call) or is_async_gen_callable(call):
stack = request.scope.get("fastapi_astack")
if stack is None:
raise RuntimeError(
async_contextmanager_dependencies_error
) # pragma: no cover
solved = await solve_generator(
call=call, stack=stack, sub_values=sub_values
)
# 用子依赖得到的结果集,作为参数,传入到依赖内容中。
elif is_coroutine_callable(call):
solved = await call(**sub_values)
else:
solved = await run_in_threadpool(call, **sub_values)
# 对于同步异步不同的执行方式
if sub_dependant.name is not None:
values[sub_dependant.name] = solved
# 收集结果
if sub_dependant.cache_key not in dependency_cache:
dependency_cache[sub_dependant.cache_key] = solved
# 将结果添加到结果集
解决该节点的解决每一个依赖项
# 依赖项不再有子依赖时,会直接跳过上面的for循环
path_values, path_errors = request_params_to_args(
dependant.path_params, request.path_params
)
query_values, query_errors = request_params_to_args(
dependant.query_params, request.query_params
)
header_values, header_errors = request_params_to_args(
dependant.header_params, request.headers
)
cookie_values, cookie_errors = request_params_to_args(
dependant.cookie_params, request.cookies
)
# 生成依赖树时,我们将需要的参数,保存到了path_params,query_params等地方。
# 现在就是从Request中提取它们的好时机
# 当然这个过程中也会产生错误,我们会收集这些错误
values.update(path_values)
values.update(query_values)
values.update(header_values)
values.update(cookie_values)
errors += path_errors + query_errors + header_errors + cookie_errors
# 合并现有错误
解决参数依赖,分别遍历依赖的个参数需求列表,与request所能提供的做匹配。
我们来看一下匹配的函数
def request_params_to_args(
required_params: Sequence[ModelField],
received_params: Union[Mapping[str, Any], QueryParams, Headers],
) -> Tuple[Dict[str, Any], List[ErrorWrapper]]:
values = {}
errors = []
for field in required_params:
if is_scalar_sequence_field(field) and isinstance(
received_params, (QueryParams, Headers)
):
value = received_params.getlist(field.alias) or field.default
else:
value = received_params.get(field.alias)
# 分别对标准序列参数和其他参数的情况进行处理,拿到value
# 这里未处理默认值的情况下
field_info = field.field_info
assert isinstance(
field_info, params.Param
), "Params must be subclasses of Param"
if value is None:
if field.required:
errors.append(
ErrorWrapper(
MissingError(), loc=(field_info.in_.value, field.alias)
)
)
# 必须则引发错误
else:
values[field.name] = deepcopy(field.default)
# 否则使用默认值
continue
v_, errors_ = field.validate(
value, values, loc=(field_info.in_.value, field.alias)
)
if isinstance(errors_, ErrorWrapper):
errors.append(errors_)
elif isinstance(errors_, list):
errors.extend(errors_)
else:
values[field.name] = v_
return values, errors
回到solve_dependencies
if dependant.body_params:
# 如果有user_info(user: User)这样的参数,User为Model。
# 其会保存到body_params中,现在是处理它们的时候
(
body_values,
body_errors,
) = await request_body_to_args( # body_params checked above
required_params=dependant.body_params, received_body=body
)
# body传入进去,进行匹配,得到结果
values.update(body_values)
errors.extend(body_errors)
# 整合结果
if dependant.http_connection_param_name:
values[dependant.http_connection_param_name] = request
if dependant.request_param_name and isinstance(request, Request):
values[dependant.request_param_name] = request
elif dependant.websocket_param_name and isinstance(request, WebSocket):
values[dependant.websocket_param_name] = request
# 这三者是解决需要特定参数的时候,主要是指Request或WebSocket这样的参数
if dependant.background_tasks_param_name:
if background_tasks is None:
background_tasks = BackgroundTasks()
values[dependant.background_tasks_param_name] = background_tasks
# 后台任务
if dependant.response_param_name:
values[dependant.response_param_name] = response
# 如果需要操作Response报文,这里会提供sub response,其内容最后会整合到response中
if dependant.security_scopes_param_name:
values[dependant.security_scopes_param_name] = SecurityScopes(
scopes=dependant.security_scopes
)
# 拿到安全域
return values, errors, background_tasks, response, dependency_cache
solve_dependencies
本身也是递归函数,这和get_dependant
是相辅相成的。