Skip to content

RedisFastProducer

faststream.redis.producer.RedisFastProducer #

RedisFastProducer(
    connection: Redis[bytes],
    parser: Union[
        None,
        AsyncCustomParser[OneMessage, OneRedisMessage],
        AsyncCustomParser[BatchMessage, BatchRedisMessage],
    ],
    decoder: Union[
        None,
        AsyncCustomDecoder[OneRedisMessage],
        AsyncCustomDecoder[BatchRedisMessage],
    ],
)
Source code in faststream/redis/producer.py
def __init__(
    self,
    connection: "Redis[bytes]",
    parser: Union[
        None,
        AsyncCustomParser[OneMessage, OneRedisMessage],
        AsyncCustomParser[BatchMessage, BatchRedisMessage],
    ],
    decoder: Union[
        None,
        AsyncCustomDecoder[OneRedisMessage],
        AsyncCustomDecoder[BatchRedisMessage],
    ],
) -> None:
    self._connection = connection
    self._parser = resolve_custom_func(
        parser,  # type: ignore[arg-type,assignment]
        RedisParser.parse_message,
    )
    self._decoder = resolve_custom_func(decoder, RedisParser.decode_message)

publish async #

publish(
    message: SendableMessage,
    channel: Optional[str] = None,
    reply_to: str = "",
    headers: Optional[AnyDict] = None,
    correlation_id: Optional[str] = None,
    *,
    list: Optional[str] = None,
    stream: Optional[str] = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False
) -> Optional[DecodedMessage]
Source code in faststream/redis/producer.py
async def publish(
    self,
    message: SendableMessage,
    channel: Optional[str] = None,
    reply_to: str = "",
    headers: Optional[AnyDict] = None,
    correlation_id: Optional[str] = None,
    *,
    list: Optional[str] = None,
    stream: Optional[str] = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
) -> Optional[DecodedMessage]:
    if not any((channel, list, stream)):
        raise ValueError(INCORRECT_SETUP_MSG)

    psub: Optional[PubSub] = None
    if rpc is True:
        if reply_to:
            raise WRONG_PUBLISH_ARGS

        reply_to = str(uuid4())
        psub = self._connection.pubsub()
        await psub.subscribe(reply_to)

    msg = RawMessage.encode(
        message=message,
        reply_to=reply_to,
        headers=headers,
        correlation_id=correlation_id,
    )

    if channel is not None:
        await self._connection.publish(channel, msg)
    elif list is not None:
        await self._connection.rpush(list, msg)
    elif stream is not None:
        await self._connection.xadd(stream, {DATA_KEY: msg})
    else:
        raise AssertionError("unreachable")

    if psub is None:
        return None

    else:
        m = None
        with timeout_scope(rpc_timeout, raise_timeout):
            # skip subscribe message
            await psub.get_message(
                ignore_subscribe_messages=True,
                timeout=rpc_timeout or 0.0,
            )

            # get real response
            m = await psub.get_message(
                ignore_subscribe_messages=True,
                timeout=rpc_timeout or 0.0,
            )

        await psub.unsubscribe()
        await psub.aclose()  # type: ignore[attr-defined]

        if m is None:
            if raise_timeout:
                raise TimeoutError()
            else:
                return None
        else:
            return await self._decoder(await self._parser(m))

publish_batch async #

publish_batch(*msgs: SendableMessage, list: str) -> None
Source code in faststream/redis/producer.py
async def publish_batch(
    self,
    *msgs: SendableMessage,
    list: str,
) -> None:
    batch = (encode_message(msg)[0] for msg in msgs)
    await self._connection.rpush(list, *batch)