Skip to content

TestKafkaBroker

faststream.kafka.TestKafkaBroker #

TestKafkaBroker(
    broker: Broker,
    with_real: bool = False,
    connect_only: Optional[bool] = None,
)

Bases: TestBroker[KafkaBroker]

Source code in faststream/broker/test.py
def __init__(
    self,
    broker: Broker,
    with_real: bool = False,
    connect_only: Optional[bool] = None,
) -> None:
    self.with_real = with_real
    self.broker = broker

    if connect_only is None:
        try:
            connect_only = is_contains_context_name(
                self.__class__.__name__,
                TestApp.__name__,
            )

        except Exception as e:
            warnings.warn(
                (
                    f"\nError `{repr(e)}` occured at `{self.__class__.__name__}` AST parsing"
                    "\nPlease, report us by creating an Issue with your TestClient usecase"
                    "\nhttps://github.com/airtai/faststream/issues/new?labels=bug&template=bug_report.md&title=Bug:%20TestClient%20AST%20parsing"
                ),
                category=RuntimeWarning,
                stacklevel=1,
            )

            connect_only = False

    self.connect_only = connect_only

broker instance-attribute #

broker = broker

connect_only instance-attribute #

connect_only = connect_only

with_real instance-attribute #

with_real = with_real

create_publisher_fake_subscriber staticmethod #

create_publisher_fake_subscriber(
    broker: KafkaBroker, publisher: Publisher
) -> HandlerCallWrapper[Any, Any, Any]
Source code in faststream/kafka/test.py
@staticmethod
def create_publisher_fake_subscriber(
    broker: KafkaBroker,
    publisher: Publisher,
) -> HandlerCallWrapper[Any, Any, Any]:
    @broker.subscriber(  # type: ignore[call-overload,misc]
        publisher.topic,
        batch=publisher.batch,
        _raw=True,
    )
    def f(msg: Any) -> None:
        pass

    return f  # type: ignore[no-any-return]

patch_publisher staticmethod #

patch_publisher(
    broker: KafkaBroker, publisher: Any
) -> None
Source code in faststream/kafka/test.py
@staticmethod
def patch_publisher(broker: KafkaBroker, publisher: Any) -> None:
    publisher._producer = broker._producer

remove_publisher_fake_subscriber staticmethod #

remove_publisher_fake_subscriber(
    broker: KafkaBroker, publisher: Publisher
) -> None
Source code in faststream/kafka/test.py
@staticmethod
def remove_publisher_fake_subscriber(
    broker: KafkaBroker, publisher: Publisher
) -> None:
    broker.handlers.pop(publisher.topic, None)