@classmethod
async def parse_message(
cls,
message: Union[OneMessage, BatchMessage],
) -> Union[OneRedisMessage, BatchRedisMessage]:
id_ = str(uuid4())
if message["type"] == "batch":
data = dump_json(
[cls.parse_one_msg(x)[0] for x in message["data"]]
).encode()
return BatchRedisMessage(
raw_message=message,
body=data,
content_type="application/json",
message_id=id_,
correlation_id=id_,
)
else:
data, headers = cls.parse_one_msg(message["data"])
channel = message.get("channel", b"").decode()
handler = context.get_local("handler_")
path_re: Optional[Pattern[str]]
path: AnyDict = {}
if (
handler
and handler.channel is not None
and (path_re := handler.channel.path_regex) is not None
):
if path_re is not None:
match = path_re.match(channel)
if match:
path = match.groupdict()
return OneRedisMessage(
raw_message=message,
body=data,
path=path,
headers=headers,
reply_to=headers.get("reply_to", ""),
content_type=headers.get("content-type", ""),
message_id=message.get("message_id", id_),
correlation_id=headers.get("correlation_id", id_),
)