TestRabbitBroker(
    broker: Broker,
    with_real: bool = False,
    connect_only: Optional[bool] = None,
)
  Bases: TestBroker[RabbitBroker]
  Source code in faststream/broker/test.py
 |  | def __init__(
    self,
    broker: Broker,
    with_real: bool = False,
    connect_only: Optional[bool] = None,
) -> None:
    self.with_real = with_real
    self.broker = broker
    if connect_only is None:
        try:
            connect_only = is_contains_context_name(
                self.__class__.__name__,
                TestApp.__name__,
            )
        except Exception as e:
            warnings.warn(
                (
                    f"\nError `{repr(e)}` occured at `{self.__class__.__name__}` AST parsing"
                    "\nPlease, report us by creating an Issue with your TestClient usecase"
                    "\nhttps://github.com/airtai/faststream/issues/new?labels=bug&template=bug_report.md&title=Bug:%20TestClient%20AST%20parsing"
                ),
                category=RuntimeWarning,
                stacklevel=1,
            )
            connect_only = False
    self.connect_only = connect_only
 | 
     broker  instance-attribute  
   
     connect_only  instance-attribute  
 connect_only = connect_only
 
     with_real  instance-attribute  
   
     create_publisher_fake_subscriber  staticmethod  
 create_publisher_fake_subscriber(
    broker: RabbitBroker, publisher: Publisher
) -> HandlerCallWrapper[Any, Any, Any]
  Source code in faststream/rabbit/test.py
 |  | @staticmethod
def create_publisher_fake_subscriber(
    broker: RabbitBroker,
    publisher: Publisher,
) -> HandlerCallWrapper[Any, Any, Any]:
    @broker.subscriber(
        queue=publisher.queue,
        exchange=publisher.exchange,
        _raw=True,
    )
    def f(msg: Any) -> None:
        pass
    return f
 | 
        patch_publisher  staticmethod  
 patch_publisher(
    broker: RabbitBroker, publisher: Any
) -> None
  Source code in faststream/rabbit/test.py
 |  | @staticmethod
def patch_publisher(broker: RabbitBroker, publisher: Any) -> None:
    publisher._producer = broker._producer
 | 
        remove_publisher_fake_subscriber  staticmethod  
 remove_publisher_fake_subscriber(
    broker: RabbitBroker, publisher: Publisher
) -> None
  Source code in faststream/rabbit/test.py
 |  | @staticmethod
def remove_publisher_fake_subscriber(
    broker: RabbitBroker,
    publisher: Publisher,
) -> None:
    broker.handlers.pop(
        publisher._get_routing_hash(),
        None,
    )
 |