async def publish(
self,
message: SendableMessage,
channel: Optional[str] = None,
reply_to: str = "",
headers: Optional[AnyDict] = None,
correlation_id: Optional[str] = None,
*,
list: Optional[str] = None,
stream: Optional[str] = None,
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
) -> Optional[DecodedMessage]:
if not any((channel, list, stream)):
raise ValueError(INCORRECT_SETUP_MSG)
psub: Optional[PubSub] = None
if rpc is True:
if reply_to:
raise WRONG_PUBLISH_ARGS
reply_to = str(uuid4())
psub = self._connection.pubsub()
await psub.subscribe(reply_to)
msg = RawMessage.encode(
message=message,
reply_to=reply_to,
headers=headers,
correlation_id=correlation_id,
)
if channel is not None:
await self._connection.publish(channel, msg)
elif list is not None:
await self._connection.rpush(list, msg)
elif stream is not None:
await self._connection.xadd(stream, {DATA_KEY: msg})
else:
raise AssertionError("unreachable")
if psub is None:
return None
else:
m = None
with timeout_scope(rpc_timeout, raise_timeout):
# skip subscribe message
await psub.get_message(
ignore_subscribe_messages=True,
timeout=rpc_timeout or 0.0,
)
# get real response
m = await psub.get_message(
ignore_subscribe_messages=True,
timeout=rpc_timeout or 0.0,
)
await psub.unsubscribe()
await psub.aclose() # type: ignore[attr-defined]
if m is None:
if raise_timeout:
raise TimeoutError()
else:
return None
else:
return await self._decoder(await self._parser(m))