Skip to content

Handler

faststream.nats.asyncapi.Handler #

Handler(
    subject: str,
    log_context_builder: Callable[
        [StreamMessage[Any]], Dict[str, str]
    ],
    queue: str = "",
    stream: Optional[JStream] = None,
    pull_sub: Optional[PullSub] = None,
    extra_options: Optional[AnyDict] = None,
    graceful_timeout: Optional[float] = None,
    description: Optional[str] = None,
    title: Optional[str] = None,
    include_in_schema: bool = True,
)

Bases: LogicNatsHandler

Source code in faststream/nats/handler.py
def __init__(
    self,
    subject: str,
    log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
    queue: str = "",
    stream: Optional[JStream] = None,
    pull_sub: Optional[PullSub] = None,
    extra_options: Optional[AnyDict] = None,
    graceful_timeout: Optional[float] = None,
    # AsyncAPI information
    description: Optional[str] = None,
    title: Optional[str] = None,
    include_in_schema: bool = True,
) -> None:
    reg, path = compile_path(subject, replace_symbol="*")
    self.subject = path
    self.path_regex = reg

    self.queue = queue

    self.stream = stream
    self.pull_sub = pull_sub
    self.extra_options = extra_options or {}

    super().__init__(
        log_context_builder=log_context_builder,
        description=description,
        include_in_schema=include_in_schema,
        title=title,
        graceful_timeout=graceful_timeout,
    )

    self.task = None
    self.subscription = None

call_name property #

call_name: str

calls instance-attribute #

calls: List[
    Tuple[
        HandlerCallWrapper[MsgType, Any, SendableMessage],
        Callable[[StreamMessage[MsgType]], Awaitable[bool]],
        AsyncParser[MsgType, Any],
        AsyncDecoder[StreamMessage[MsgType]],
        Sequence[Callable[[Any], BaseMiddleware]],
        CallModel[Any, SendableMessage],
    ]
]

description property #

description: Optional[str]

extra_options instance-attribute #

extra_options = extra_options or {}

global_middlewares instance-attribute #

global_middlewares: Sequence[
    Callable[[Any], BaseMiddleware]
] = []

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

include_in_schema instance-attribute #

include_in_schema = include_in_schema

lock instance-attribute #

lock = MultiLock()

log_context_builder instance-attribute #

log_context_builder = log_context_builder

path_regex instance-attribute #

path_regex = reg

pull_sub instance-attribute #

pull_sub = pull_sub

queue instance-attribute #

queue = queue

running instance-attribute #

running = False

stream instance-attribute #

stream = stream

subject instance-attribute #

subject = path

subscription instance-attribute #

subscription: Union[
    None,
    Subscription,
    JetStreamContext.PushSubscription,
    JetStreamContext.PullSubscription,
] = None

task class-attribute instance-attribute #

task: Optional[asyncio.Task[Any]] = None

add_call #

add_call(
    *,
    handler: HandlerCallWrapper[
        Msg, P_HandlerParams, T_HandlerReturn
    ],
    dependant: CallModel[P_HandlerParams, T_HandlerReturn],
    parser: Optional[CustomParser[Msg, NatsMessage]],
    decoder: Optional[CustomDecoder[NatsMessage]],
    filter: Filter[NatsMessage],
    middlewares: Optional[
        Sequence[Callable[[Msg], BaseMiddleware]]
    ]
) -> None
Source code in faststream/nats/handler.py
def add_call(
    self,
    *,
    handler: HandlerCallWrapper[Msg, P_HandlerParams, T_HandlerReturn],
    dependant: CallModel[P_HandlerParams, T_HandlerReturn],
    parser: Optional[CustomParser[Msg, NatsMessage]],
    decoder: Optional[CustomDecoder[NatsMessage]],
    filter: Filter[NatsMessage],
    middlewares: Optional[Sequence[Callable[[Msg], BaseMiddleware]]],
) -> None:
    parser_ = Parser if self.stream is None else JsParser
    super().add_call(
        handler=handler,
        parser=resolve_custom_func(parser, parser_.parse_message),
        decoder=resolve_custom_func(decoder, parser_.decode_message),
        filter=filter,  # type: ignore[arg-type]
        dependant=dependant,
        middlewares=middlewares,
    )

close async #

close() -> None
Source code in faststream/nats/handler.py
async def close(self) -> None:
    await super().close()

    if self.subscription is not None:
        await self.subscription.unsubscribe()
        self.subscription = None

    if self.task is not None:
        self.task.cancel()
        self.task = None

consume async #

consume(msg: MsgType) -> SendableMessage

Consume a message asynchronously.

PARAMETER DESCRIPTION
msg

The message to be consumed.

TYPE: MsgType

RETURNS DESCRIPTION
SendableMessage

The sendable message.

RAISES DESCRIPTION
StopConsume

If the consumption needs to be stopped.

RAISES DESCRIPTION
Exception

If an error occurs during consumption.

Source code in faststream/broker/handler.py
@override
async def consume(self, msg: MsgType) -> SendableMessage:  # type: ignore[override]
    """Consume a message asynchronously.

    Args:
        msg: The message to be consumed.

    Returns:
        The sendable message.

    Raises:
        StopConsume: If the consumption needs to be stopped.

    Raises:
        Exception: If an error occurs during consumption.

    """
    result: Optional[WrappedReturn[SendableMessage]] = None
    result_msg: SendableMessage = None

    if not self.running:
        return result_msg

    async with AsyncExitStack() as stack:
        stack.enter_context(self.lock)

        gl_middlewares: List[BaseMiddleware] = []

        stack.enter_context(context.scope("handler_", self))

        for m in self.global_middlewares:
            gl_middlewares.append(await stack.enter_async_context(m(msg)))

        logged = False
        processed = False
        for handler, filter_, parser, decoder, middlewares, _ in self.calls:
            local_middlewares: List[BaseMiddleware] = []
            for local_m in middlewares:
                local_middlewares.append(
                    await stack.enter_async_context(local_m(msg))
                )

            all_middlewares = gl_middlewares + local_middlewares

            # TODO: add parser & decoder caches
            message = await parser(msg)

            if not logged:  # pragma: no branch
                log_context_tag = context.set_local(
                    "log_context", self.log_context_builder(message)
                )

            message.decoded_body = await decoder(message)
            message.processed = processed

            if await filter_(message):
                assert (  # nosec B101
                    not processed
                ), "You can't proccess a message with multiple consumers"

                try:
                    async with AsyncExitStack() as consume_stack:
                        for m_consume in all_middlewares:
                            message.decoded_body = (
                                await consume_stack.enter_async_context(
                                    m_consume.consume_scope(message.decoded_body)
                                )
                            )

                        result = await cast(
                            Awaitable[Optional[WrappedReturn[SendableMessage]]],
                            handler.call_wrapped(message),
                        )

                    if result is not None:
                        result_msg, pub_response = result

                        # TODO: suppress all publishing errors and raise them after all publishers will be tried
                        for publisher in (pub_response, *handler._publishers):
                            if publisher is not None:
                                async with AsyncExitStack() as pub_stack:
                                    result_to_send = result_msg

                                    for m_pub in all_middlewares:
                                        result_to_send = (
                                            await pub_stack.enter_async_context(
                                                m_pub.publish_scope(result_to_send)
                                            )
                                        )

                                    await publisher.publish(
                                        message=result_to_send,
                                        correlation_id=message.correlation_id,
                                    )

                except StopConsume:
                    await self.close()
                    handler.trigger()

                except HandlerException as e:  # pragma: no cover
                    handler.trigger()
                    raise e

                except Exception as e:
                    handler.trigger(error=e)
                    raise e

                else:
                    handler.trigger(result=result[0] if result else None)
                    message.processed = processed = True
                    if IS_OPTIMIZED:  # pragma: no cover
                        break

        assert (
            not self.running or processed
        ), "You have to consume message"  # nosec B101

    context.reset_local("log_context", log_context_tag)

    return result_msg

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/handler.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    for h, _, _, _, _, dep in self.calls:
        body = parse_handler_params(
            dep, prefix=f"{self._title or self.call_name}:Message"
        )
        payloads.append((body, to_camelcase(unwrap(h._original_call).__name__)))

    return payloads

get_routing_hash staticmethod #

get_routing_hash(subject: str) -> str
Source code in faststream/nats/handler.py
@staticmethod
def get_routing_hash(subject: str) -> str:
    return subject

name #

name() -> str
Source code in faststream/asyncapi/base.py
@abstractproperty
def name(self) -> str:
    raise NotImplementedError()

schema #

schema() -> Dict[str, Channel]
Source code in faststream/nats/asyncapi.py
def schema(self) -> Dict[str, Channel]:
    if not self.include_in_schema:
        return {}

    payloads = self.get_payloads()
    handler_name = self._title or f"{self.subject}:{self.call_name}"
    return {
        handler_name: Channel(
            description=self.description,
            subscribe=Operation(
                message=Message(
                    title=f"{handler_name}:Message",
                    payload=resolve_payloads(payloads),
                    correlationId=CorrelationId(
                        location="$message.header#/correlation_id"
                    ),
                ),
            ),
            bindings=ChannelBinding(
                nats=nats.ChannelBinding(
                    subject=self.subject,
                    queue=self.queue or None,
                )
            ),
        )
    }

start async #

start(connection: Union[Client, JetStreamContext]) -> None
Source code in faststream/nats/handler.py
@override
async def start(self, connection: Union[Client, JetStreamContext]) -> None:  # type: ignore[override]
    if self.pull_sub is not None:
        connection = cast(JetStreamContext, connection)

        if self.stream is None:
            raise ValueError("Pull subscriber can be used only with a stream")

        self.subscription = await connection.pull_subscribe(
            subject=self.subject,
            **self.extra_options,
        )
        self.task = asyncio.create_task(self._consume())

    else:
        self.subscription = await connection.subscribe(
            subject=self.subject,
            queue=self.queue,
            cb=self.consume,  # type: ignore[arg-type]
            **self.extra_options,
        )

    await super().start()