Publishing#
FastStream RabbitBroker supports all regular publishing usecases. you can use them without any changes.
However, if you wish to further customize the publishing logic further, you should take a more deep-dive look at specific RabbitBroker parameters.
Rabbit Publishing#
RabbitBroker also uses the unified publish method (from a publisher object) to send messages.
However, in this case, an object of the aio_pika.Message class (if necessary) can be used as a message (in addition to python primitives and pydantic.BaseModel).
You can specify queue (used as a routing_key) and exchange (optionally) to send by their name.
import asyncio
from faststream.rabbit import RabbitBroker
async def pub():
async with RabbitBroker() as broker:
await broker.publish(
"Hi!",
queue="test",
exchange="test"
)
asyncio.run(pub())
If you don't specify any exchange, the message will be send to the default one.
Also, you are able to use special RabbitQueue and RabbitExchange objects as queue and exchange arguments:
from faststream.rabbit import RabbitExchange, RabbitQueue
await broker.publish(
"Hi!",
queue=RabbitQueue("test"),
exchange=RabbitExchange("test")
)
If you specify exchange that doesn't exist, RabbitBroker will create a required one and then publish a message to it.
Tip
Be accurate with it: if you have already created an Exchange with specific parameters and try to send a message by exchange name to it, the broker will try to create it. So, Exchange parameters conflict will occur.
If you are trying to send a message to a specific Exchange, sending it with a defined RabbitExchange object is the preffered way.
Basic Arguments#
The publish method takes the following arguments:
message = ""- message to sendexchange: str | RabbitExchange | None = None- the exchange where the message will be sent to. If not specified - default is usedqueue: str | RabbitQueue = ""- the queue where the message will be sent (since most queues use their name as the routing key, this is a human-readable version ofrouting_key)routing_key: str = ""- also a message routing key, if not specified, thequeueargument will be used
Message Parameters#
You can read more about all the available flags in the RabbitMQ documentation
headers: dict[str, Any] | None = None- message headers (used by consumers)content_type: str | None = None- the content_type of the message being sent (set automatically, used by consumers)content_encoding: str | None = None- encoding of the message (used by consumers)persist: bool = False- restore messages on RabbitMQ rebootpriority: int | None = None- the priority of the messagecorrelation_id: str | None = None- message id, which helps to match the original message with the reply to it (generated automatically)message_id: str | None = None- message ID (generated automatically)timestamp: int | float | time delta | datetime | None = None- message sending time (set automatically)expiration: int | float | time delta | datetime | None = None- message lifetime (in seconds)type: str | None = None- the type of message (used by consumers)user_id: str | None = None- ID of the RabbitMQ user who sent the messageapp_id: str | None = None- ID of the application that sent the message (used by consumers)
Send Flags#
Arguments for sending a message:
mandatory: bool = True- the client is waiting for confirmation that the message will be placed in some queue (if there are no queues, return it to the sender)immediate: bool = False- the client expects that there is a consumer ready to take the message to work "right now" (if there is no consumer, return it to the sender)timeout: int | float | None = None- send confirmation time from RabbitMQ