KafkaMessage(
*args: Any,
consumer: aiokafka.AIOKafkaConsumer,
is_manual: bool = False,
**kwargs: Any
)
Bases: StreamMessage[ConsumerRecord]
Represents a Kafka message in the FastStream framework.
This class extends StreamMessage
and is specialized for handling Kafka ConsumerRecord objects.
METHOD | DESCRIPTION |
ack | Acknowledge the Kafka message. |
nack | Negative acknowledgment of the Kafka message. |
reject | Reject the Kafka message. |
Source code in faststream/kafka/message.py
| def __init__(
self,
*args: Any,
consumer: aiokafka.AIOKafkaConsumer,
is_manual: bool = False,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.is_manual = is_manual
self.consumer = consumer
|
commited class-attribute
instance-attribute
commited: bool = field(default=False, init=False)
consumer instance-attribute
content_type class-attribute
instance-attribute
content_type: Optional[str] = None
correlation_id class-attribute
instance-attribute
correlation_id: str = field(
default_factory=lambda: str(uuid4())
)
decoded_body class-attribute
instance-attribute
decoded_body: Optional[DecodedMessage] = None
headers: AnyDict = field(default_factory=dict)
is_manual instance-attribute
message_id class-attribute
instance-attribute
message_id: str = field(
default_factory=lambda: str(uuid4())
)
path class-attribute
instance-attribute
path: AnyDict = field(default_factory=dict)
processed class-attribute
instance-attribute
processed: bool = field(default=False, init=False)
raw_message instance-attribute
reply_to class-attribute
instance-attribute
ack async
ack(**kwargs: Any) -> None
Acknowledge the Kafka message.
PARAMETER | DESCRIPTION |
**kwargs | Additional keyword arguments. TYPE: Any DEFAULT: {} |
RETURNS | DESCRIPTION |
None | This method does not return a value. TYPE: None |
Source code in faststream/kafka/message.py
| async def ack(self, **kwargs: Any) -> None:
"""
Acknowledge the Kafka message.
Args:
**kwargs (Any): Additional keyword arguments.
Returns:
None: This method does not return a value.
"""
if self.is_manual and not self.commited:
await self.consumer.commit()
await super().ack()
|
nack async
nack(**kwargs: Any) -> None
Source code in faststream/broker/message.py
| async def nack(self, **kwargs: Any) -> None:
self.commited = True
|
reject async
reject(**kwargs: Any) -> None
Source code in faststream/broker/message.py
| async def reject(self, **kwargs: Any) -> None:
self.commited = True
|