Resolve a custom parser/decoder with default one.
Source code in faststream/broker/utils.py
| def resolve_custom_func(
custom_func: Optional["CustomCallable"],
default_func: "AsyncCallable",
) -> "AsyncCallable":
"""Resolve a custom parser/decoder with default one."""
if custom_func is None:
return default_func
original_params = inspect.signature(custom_func).parameters
if len(original_params) == 1:
return to_async(cast(Union["SyncCallable", "AsyncCallable"], custom_func))
else:
name = tuple(original_params.items())[1][0]
return partial(to_async(custom_func), **{name: default_func})
|