KafkaLoggingBroker(*args, logger=EMPTY, log_level=INFO, log_fmt=None, **kwargs)
  Bases: BrokerUsecase[Union['aiokafka.ConsumerRecord', Tuple['aiokafka.ConsumerRecord', ...]], Callable[..., 'aiokafka.AIOKafkaConsumer']]
 A class that extends the LoggingMixin class and adds additional functionality for logging Kafka related information.
 Initialize the class.
  Source code in faststream/kafka/broker/logging.py
 |  | def __init__(
    self,
    *args: Any,
    logger: Optional["LoggerProto"] = EMPTY,
    log_level: int = logging.INFO,
    log_fmt: Optional[str] = None,
    **kwargs: Any,
) -> None:
    """Initialize the class."""
    super().__init__(
        *args,
        logger=logger,
        # TODO: generate unique logger names to not share between brokers
        default_logger=get_broker_logger(
            name="kafka",
            default_context={
                "topic": "",
                "group_id": "",
            },
            message_id_ln=self.__max_msg_id_ln,
        ),
        log_level=log_level,
        log_fmt=log_fmt,
        **kwargs,
    )
    self._max_topic_len = 4
    self._max_group_len = 0
 | 
     instance-attribute  
   
     instance-attribute  
   
     instance-attribute  
   
     instance-attribute  
   
      instance-attribute  
   
     instance-attribute  
   
     instance-attribute  
   
     instance-attribute  
   
     instance-attribute  
   
      instance-attribute  
   
     
   Prepare all Broker entities to startup.
  Source code in faststream/broker/core/usecase.py
 |  | def setup(self) -> None:
    """Prepare all Broker entities to startup."""
    for h in self._subscribers.values():
        self.setup_subscriber(h)
    for p in self._publishers.values():
        self.setup_publisher(p)
 | 
        
 add_middleware(middleware)
 Append BrokerMiddleware to the end of middlewares list.
 Current middleware will be used as a most inner of already existed ones.
  Source code in faststream/broker/core/abc.py
 |  | def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.
    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)
    for sub in self._subscribers.values():
        sub.add_middleware(middleware)
    for pub in self._publishers.values():
        pub.add_middleware(middleware)
 | 
        abstractmethod  
    Source code in faststream/broker/core/abc.py
 |  | @abstractmethod
def subscriber(
    self,
    subscriber: "SubscriberProto[MsgType]",
) -> "SubscriberProto[MsgType]":
    subscriber.add_prefix(self.prefix)
    key = hash(subscriber)
    subscriber = self._subscribers.get(key, subscriber)
    self._subscribers = {**self._subscribers, key: subscriber}
    return subscriber
 | 
        
 publisher(*args, **kwargs)
  Source code in faststream/broker/core/usecase.py
 |  | def publisher(self, *args: Any, **kwargs: Any) -> "PublisherProto[MsgType]":
    pub = super().publisher(*args, **kwargs)
    if self.running:
        self.setup_publisher(pub)
    return pub
 | 
        
 include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)
 Includes a router in the current object.
  Source code in faststream/broker/core/abc.py
 |  | def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))
        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema
            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}
    for p in router._publishers.values():
        p.add_prefix(self.prefix)
        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema
            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}
 | 
        
 include_routers(*routers)
 Includes routers in the object.
  Source code in faststream/broker/core/abc.py
 |  | def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)
 | 
        abstractmethod async  
   Start the broker async use case.
  Source code in faststream/broker/core/usecase.py
 |  | @abstractmethod
async def start(self) -> None:
    """Start the broker async use case."""
    self._abc_start()
    await self.connect()
 | 
        async  
   Connect to a remote server.
  Source code in faststream/broker/core/usecase.py
 |  | async def connect(self, **kwargs: Any) -> ConnectionType:
    """Connect to a remote server."""
    if self._connection is None:
        connection_kwargs = self._connection_kwargs.copy()
        connection_kwargs.update(kwargs)
        self._connection = await self._connect(**connection_kwargs)
    self.setup()
    return self._connection
 | 
        
 setup_subscriber(subscriber, **kwargs)
 Setup the Subscriber to prepare it to starting.
  Source code in faststream/broker/core/usecase.py
 |  | def setup_subscriber(
    self,
    subscriber: SubscriberProto[MsgType],
    **kwargs: Any,
) -> None:
    """Setup the Subscriber to prepare it to starting."""
    data = self._subscriber_setup_extra.copy()
    data.update(kwargs)
    subscriber.setup(**data)
 | 
        
 setup_publisher(publisher, **kwargs)
 Setup the Publisher to prepare it to starting.
  Source code in faststream/broker/core/usecase.py
 |  | def setup_publisher(
    self,
    publisher: "PublisherProto[MsgType]",
    **kwargs: Any,
) -> None:
    """Setup the Publisher to prepare it to starting."""
    data = self._publisher_setup_extra.copy()
    data.update(kwargs)
    publisher.setup(**data)
 | 
        async  
 close(exc_type=None, exc_val=None, exc_tb=None)
 Closes the object.
  Source code in faststream/broker/core/usecase.py
 |  | async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    """Closes the object."""
    self.running = False
    for h in self._subscribers.values():
        await h.close()
    if self._connection is not None:
        await self._close(exc_type, exc_val, exc_tb)
 | 
        async  
 publish(msg, *, producer, correlation_id=None, **kwargs)
 Publish message directly.
  Source code in faststream/broker/core/usecase.py
 |  | async def publish(
    self,
    msg: Any,
    *,
    producer: Optional["ProducerProto"],
    correlation_id: Optional[str] = None,
    **kwargs: Any,
) -> Optional[Any]:
    """Publish message directly."""
    assert producer, NOT_CONNECTED_YET  # nosec B101
    publish = producer.publish
    for m in self._middlewares:
        publish = partial(m(None).publish_scope, publish)
    return await publish(msg, correlation_id=correlation_id, **kwargs)
 | 
        async  
 request(msg, *, producer, correlation_id=None, **kwargs)
 Publish message directly.
  Source code in faststream/broker/core/usecase.py
 |  | async def request(
    self,
    msg: Any,
    *,
    producer: Optional["ProducerProto"],
    correlation_id: Optional[str] = None,
    **kwargs: Any,
) -> Any:
    """Publish message directly."""
    assert producer, NOT_CONNECTED_YET  # nosec B101
    request = producer.request
    for m in self._middlewares:
        request = partial(m(None).publish_scope, request)
    published_msg = await request(
        msg,
        correlation_id=correlation_id,
        **kwargs,
    )
    async with AsyncExitStack() as stack:
        return_msg = return_input
        for m in self._middlewares:
            mid = m(published_msg)
            await stack.enter_async_context(mid)
            return_msg = partial(mid.consume_scope, return_msg)
        parsed_msg: StreamMessage[Any] = await producer._parser(published_msg)
        parsed_msg._decoded_body = await producer._decoder(parsed_msg)
        parsed_msg._source_type = SourceType.Response
        return await return_msg(parsed_msg)
 | 
        abstractmethod async  
   Check connection alive.
  Source code in faststream/broker/core/usecase.py
 |  | @abstractmethod
async def ping(self, timeout: Optional[float]) -> bool:
    """Check connection alive."""
    raise NotImplementedError()
 | 
        
    Source code in faststream/kafka/broker/logging.py
 |  | def get_fmt(self) -> str:
    return (
        "%(asctime)s %(levelname)-8s - "
        + f"%(topic)-{self._max_topic_len}s | "
        + (f"%(group_id)-{self._max_group_len}s | " if self._max_group_len else "")
        + f"%(message_id)-{self.__max_msg_id_ln}s "
        + "- %(message)s"
    )
 |