async def parse_message(
self, message: Union[Msg, List[Msg]], *, path: Optional[AnyDict] = None
) -> Union[StreamMessage[Msg], StreamMessage[List[Msg]],]:
if isinstance(message, list):
return NatsMessage(
is_js=self.is_js,
raw_message=message, # type: ignore[arg-type]
body=[m.data for m in message],
)
else:
path_re: Optional[Pattern[str]]
if (
path is None
and (handler := context.get_local("handler_"))
and (path_re := handler.path_regex) is not None
and (match := path_re.match(message.subject)) is not None
):
path = match.groupdict()
headers = message.header or {}
return NatsMessage(
is_js=self.is_js,
raw_message=message,
body=message.data,
path=path or {},
reply_to=headers.get("reply_to", "") if self.is_js else message.reply,
headers=headers,
content_type=headers.get("content-type", ""),
message_id=headers.get("message_id", str(uuid4())),
correlation_id=headers.get("correlation_id", str(uuid4())),
)