async def publish(
self,
message: SendableMessage,
subject: str,
reply_to: str = "",
headers: Optional[Dict[str, str]] = None,
correlation_id: Optional[str] = None,
*,
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
) -> Optional[DecodedMessage]:
payload, content_type = encode_message(message)
headers_to_send = {
"content-type": content_type or "",
"correlation_id": correlation_id or str(uuid4()),
**(headers or {}),
}
client = self._connection
if rpc:
if reply_to:
raise WRONG_PUBLISH_ARGS
token = client._nuid.next()
token.extend(token_hex(2).encode())
reply_to = token.decode()
future: asyncio.Future[Msg] = asyncio.Future()
sub = await client.subscribe(reply_to, future=future, max_msgs=1)
await sub.unsubscribe(limit=1)
await client.publish(
subject=subject,
payload=payload,
reply=reply_to,
headers=headers_to_send,
)
if rpc:
msg: Any = None
with timeout_scope(rpc_timeout, raise_timeout):
msg = await future
if msg: # pragma: no branch
if msg.headers: # pragma: no cover
if (
msg.headers.get(nats.js.api.Header.STATUS)
== nats.aio.client.NO_RESPONDERS_STATUS
):
raise nats.errors.NoRespondersError
return await self._decoder(await self._parser(msg))
return None