Bases: StreamMessage[MsgType]
commited class-attribute
instance-attribute
commited: bool = field(default=False, init=False)
content_type class-attribute
instance-attribute
content_type: Optional[str] = None
correlation_id class-attribute
instance-attribute
correlation_id: str = field(
default_factory=lambda: str(uuid4())
)
decoded_body class-attribute
instance-attribute
decoded_body: Optional[DecodedMessage] = None
headers: AnyDict = field(default_factory=dict)
message_id class-attribute
instance-attribute
message_id: str = field(
default_factory=lambda: str(uuid4())
)
path class-attribute
instance-attribute
path: AnyDict = field(default_factory=dict)
processed class-attribute
instance-attribute
processed: bool = field(default=False, init=False)
raw_message instance-attribute
reply_to class-attribute
instance-attribute
ack async
ack(redis: Redis[bytes], **kwargs: Any) -> None
Source code in faststream/redis/message.py
| @override
async def ack( # type: ignore[override]
self,
redis: "Redis[bytes]",
**kwargs: Any,
) -> None:
if (
not self.commited
and (ids := self.raw_message.get("message_ids"))
and (handler := context.get_local("handler_"))
and (stream := handler.stream_sub)
and (group := stream.group)
):
await redis.xack(self.raw_message["channel"], group, *ids) # type: ignore[no-untyped-call]
await super().ack()
|
nack async
nack(**kwargs: Any) -> None
Source code in faststream/broker/message.py
| async def nack(self, **kwargs: Any) -> None:
self.commited = True
|
reject async
reject(**kwargs: Any) -> None
Source code in faststream/broker/message.py
| async def reject(self, **kwargs: Any) -> None:
self.commited = True
|