Skip to content

NatsParser

faststream.nats.parser.NatsParser #

NatsParser(is_js: bool)
Source code in faststream/nats/parser.py
def __init__(self, is_js: bool) -> None:
    self.is_js = is_js

is_js instance-attribute #

is_js = is_js

decode_message async #

decode_message(
    msg: Union[StreamMessage[Msg], StreamMessage[List[Msg]]]
) -> DecodedMessage
Source code in faststream/nats/parser.py
async def decode_message(
    self,
    msg: Union[
        StreamMessage[Msg],
        StreamMessage[List[Msg]],
    ],
) -> DecodedMessage:
    if isinstance(msg.raw_message, list):
        data = []

        path: Optional[AnyDict] = None
        for m in msg.raw_message:
            msg = await self.parse_message(m, path=path)
            path = msg.path

            data.append(decode_message(msg))

        return data

    else:
        return decode_message(msg)

parse_message async #

parse_message(
    message: Union[Msg, List[Msg]],
    *,
    path: Optional[AnyDict] = None
) -> Union[StreamMessage[Msg], StreamMessage[List[Msg]]]
Source code in faststream/nats/parser.py
async def parse_message(
    self, message: Union[Msg, List[Msg]], *, path: Optional[AnyDict] = None
) -> Union[StreamMessage[Msg], StreamMessage[List[Msg]],]:
    if isinstance(message, list):
        return NatsMessage(
            is_js=self.is_js,
            raw_message=message,  # type: ignore[arg-type]
            body=[m.data for m in message],
        )

    else:
        path_re: Optional[Pattern[str]]
        if (
            path is None
            and (handler := context.get_local("handler_"))
            and (path_re := handler.path_regex) is not None
            and (match := path_re.match(message.subject)) is not None
        ):
            path = match.groupdict()

        headers = message.header or {}

        return NatsMessage(
            is_js=self.is_js,
            raw_message=message,
            body=message.data,
            path=path or {},
            reply_to=headers.get("reply_to", "") if self.is_js else message.reply,
            headers=headers,
            content_type=headers.get("content-type", ""),
            message_id=headers.get("message_id", str(uuid4())),
            correlation_id=headers.get("correlation_id", str(uuid4())),
        )