Bases: BrokerRouter[str, Msg]
Source code in faststream/nats/shared/router.py
| def __init__(
self,
prefix: str = "",
handlers: Sequence[NatsRoute[Msg, SendableMessage]] = (),
**kwargs: Any,
) -> None:
for h in handlers:
if not (subj := h.kwargs.pop("subject", None)):
subj, h.args = h.args[0], h.args[1:]
h.args = (prefix + subj, *h.args)
super().__init__(prefix, handlers, **kwargs)
|
include_in_schema instance-attribute
include_in_schema = include_in_schema
prefix instance-attribute
include_router
include_router(
router: BrokerRouter[PublisherKeyType, MsgType]
) -> None
Includes a router in the current object.
PARAMETER | DESCRIPTION |
router | The router to be included. TYPE: BrokerRouter[PublisherKeyType, MsgType] |
Source code in faststream/broker/router.py
| def include_router(self, router: "BrokerRouter[PublisherKeyType, MsgType]") -> None:
"""Includes a router in the current object.
Args:
router: The router to be included.
Returns:
None
"""
for h in router._handlers:
self.subscriber(*h.args, **h.kwargs)(h.call)
for p in router._publishers.values():
p = self._update_publisher_prefix(self.prefix, p)
key = self._get_publisher_key(p)
self._publishers[key] = self._publishers.get(key, p)
|
include_routers
include_routers(
*routers: BrokerRouter[PublisherKeyType, MsgType]
) -> None
Includes routers in the object.
PARAMETER | DESCRIPTION |
*routers | Variable length argument list of routers to include. TYPE: BrokerRouter[PublisherKeyType, MsgType] DEFAULT: () |
Source code in faststream/broker/router.py
| def include_routers(
self, *routers: "BrokerRouter[PublisherKeyType, MsgType]"
) -> None:
"""Includes routers in the object.
Args:
*routers: Variable length argument list of routers to include.
Returns:
None
"""
for r in routers:
self.include_router(r)
|
publisher abstractmethod
publisher(
subj: str, *args: Any, **kwargs: Any
) -> BasePublisher[MsgType]
Publishes a message.
PARAMETER | DESCRIPTION |
subj | TYPE: str |
*args | TYPE: Any DEFAULT: () |
**kwargs | Additional keyword arguments TYPE: Any DEFAULT: {} |
Source code in faststream/broker/router.py
| @abstractmethod
def publisher(
self,
subj: str,
*args: Any,
**kwargs: Any,
) -> BasePublisher[MsgType]:
"""Publishes a message.
Args:
subj: Subject of the message
*args: Additional arguments
**kwargs: Additional keyword arguments
Returns:
The published message
Raises:
NotImplementedError: If the method is not implemented
"""
raise NotImplementedError()
|
subscriber
subscriber(
subject: str, **broker_kwargs: Any
) -> Callable[
[Callable[P_HandlerParams, T_HandlerReturn]],
HandlerCallWrapper[
Msg, P_HandlerParams, T_HandlerReturn
],
]
Source code in faststream/nats/shared/router.py
| @override
def subscriber( # type: ignore[override]
self,
subject: str,
**broker_kwargs: Any,
) -> Callable[
[Callable[P_HandlerParams, T_HandlerReturn]],
HandlerCallWrapper[Msg, P_HandlerParams, T_HandlerReturn],
]:
return self._wrap_subscriber(
self.prefix + subject,
**broker_kwargs,
)
|