Skip to content

PubSub

faststream.redis.PubSub #

PubSub(
    channel: str,
    pattern: bool = False,
    polling_interval: PositiveFloat = 1.0,
)

Bases: NameRequired

Source code in faststream/redis/schemas.py
def __init__(
    self,
    channel: str,
    pattern: bool = False,
    polling_interval: PositiveFloat = 1.0,
) -> None:
    reg, path = compile_path(channel, replace_symbol="*")

    if reg is not None:
        pattern = True

    super().__init__(
        name=path,
        path_regex=reg,
        pattern=pattern,
        polling_interval=polling_interval,
    )

last_id class-attribute instance-attribute #

last_id: str = '$'

model_config class-attribute instance-attribute #

model_config = {'arbitrary_types_allowed': True}

name class-attribute instance-attribute #

name: str = Field(...)

path_regex class-attribute instance-attribute #

path_regex: Optional[Pattern[str]] = None

pattern class-attribute instance-attribute #

pattern: bool = False

polling_interval class-attribute instance-attribute #

polling_interval: PositiveFloat = 1.0

Config #

arbitrary_types_allowed class-attribute instance-attribute #

arbitrary_types_allowed = True

validate classmethod #

validate(
    value: Union[str, NameRequiredCls, None], **kwargs: Any
) -> Optional[NameRequiredCls]

Validates a value.

PARAMETER DESCRIPTION
value

The value to be validated.

TYPE: Union[str, NameRequiredCls, None]

RETURNS DESCRIPTION
Optional[NameRequiredCls]

The validated value.

Source code in faststream/broker/schemas.py
@classmethod
def validate(
    cls: Type[NameRequiredCls],
    value: Union[str, NameRequiredCls, None],
    **kwargs: Any,
) -> Optional[NameRequiredCls]:
    """Validates a value.

    Args:
        value: The value to be validated.

    Returns:
        The validated value.

    """
    if value is not None:
        if isinstance(value, str):
            value = cls(value, **kwargs)
    return value