@override
async def publish(
self,
message: SendableMessage,
channel: Optional[str] = None,
reply_to: str = "",
headers: Optional[AnyDict] = None,
correlation_id: Optional[str] = None,
*,
list: Optional[str] = None,
stream: Optional[str] = None,
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
) -> Optional[DecodedMessage]:
any_of = channel or list or stream
if any_of is None:
raise ValueError(INCORRECT_SETUP_MSG)
for handler in self.broker.handlers.values(): # pragma: no branch
call = False
batch = False
if channel and (ch := handler.channel) is not None:
call = bool(
(not ch.pattern and ch.name == channel)
or (
ch.pattern
and re.match(
ch.name.replace(".", "\\.").replace("*", ".*"),
channel,
)
)
)
if list and (ls := handler.list_sub) is not None:
batch = ls.batch
call = list == ls.name
if stream and (st := handler.stream_sub) is not None:
batch = st.batch
call = stream == st.name
if call:
r = await call_handler(
handler=handler,
message=build_message(
message=[message] if batch else message,
channel=any_of,
headers=headers,
correlation_id=correlation_id,
reply_to=reply_to,
),
rpc=rpc,
rpc_timeout=rpc_timeout,
raise_timeout=raise_timeout,
)
if rpc: # pragma: no branch
return r
return None