Context Fields Declaration  You can also store your own objects in the Context.
 Global  To declare an application-level context field, you need to call the context.set_global method with with a key to indicate where the object will be placed in the context.
 AIOKafka Confluent RabbitMQ NATS Redis 
   from  faststream  import  FastStream ,  ContextRepo ,  Context 
from  faststream.kafka  import  KafkaBroker 
broker  =  KafkaBroker ( "localhost:9092" ) 
app  =  FastStream ( broker ) 
@app . on_startup 
async  def  set_global ( context :  ContextRepo ): 
    context . set_global ( "secret_str" ,  "my-perfect-secret" ) 
 
   from  faststream  import  FastStream ,  ContextRepo ,  Context 
from  faststream.confluent  import  KafkaBroker 
broker  =  KafkaBroker ( "localhost:9092" ) 
app  =  FastStream ( broker ) 
@app . on_startup 
async  def  set_global ( context :  ContextRepo ): 
    context . set_global ( "secret_str" ,  "my-perfect-secret" ) 
 
   from  faststream  import  FastStream ,  ContextRepo ,  Context 
from  faststream.rabbit  import  RabbitBroker 
broker  =  RabbitBroker ( "amqp://guest:guest@localhost:5672/" ) 
app  =  FastStream ( broker ) 
@app . on_startup 
async  def  set_global ( context :  ContextRepo ): 
    context . set_global ( "secret_str" ,  "my-perfect-secret" ) 
 
   from  faststream  import  FastStream ,  ContextRepo ,  Context 
from  faststream.nats  import  NatsBroker 
broker  =  NatsBroker ( "nats://localhost:4222" ) 
app  =  FastStream ( broker ) 
@app . on_startup 
async  def  set_global ( context :  ContextRepo ): 
    context . set_global ( "secret_str" ,  "my-perfect-secret" ) 
 
   from  faststream  import  FastStream ,  ContextRepo ,  Context 
from  faststream.redis  import  RedisBroker 
broker  =  RedisBroker ( "redis://localhost:6379" ) 
app  =  FastStream ( broker ) 
@app . on_startup 
async  def  set_global ( context :  ContextRepo ): 
    context . set_global ( "secret_str" ,  "my-perfect-secret" ) 
 
    Afterward, you can access your secret field in the usual way:
 AIOKafka Confluent RabbitMQ NATS Redis 
   @broker . subscriber ( "test-topic" ) 
async  def  handle ( 
msg :  str , 
    secret_str :  str = Context (), 
 ): 
assert  secret_str  ==  "my-perfect-secret" 
   @broker . subscriber ( "test-topic" ) 
async  def  handle ( 
msg :  str , 
    secret_str :  str = Context (), 
 ): 
assert  secret_str  ==  "my-perfect-secret" 
   @broker . subscriber ( "test-queue" ) 
async  def  handle ( 
msg :  str , 
    secret_str :  str = Context (), 
 ): 
assert  secret_str  ==  "my-perfect-secret" 
   @broker . subscriber ( "test-subject" ) 
async  def  handle ( 
msg :  str , 
    secret_str :  str = Context (), 
 ): 
assert  secret_str  ==  "my-perfect-secret" 
   @broker . subscriber ( "test-channel" ) 
async  def  handle ( 
msg :  str , 
    secret_str :  str = Context (), 
 ): 
assert  secret_str  ==  "my-perfect-secret" 
    In this case, the field becomes a global context field: it does not depend on the current message handler (unlike message)
 To remove a field from the context use the reset_global method:
 context . reset_global ( "my_key" ) 
Local  To set a local context (available only within the message processing scope), use the context manager scope
 AIOKafka Confluent RabbitMQ NATS Redis 
   from  faststream  import  Context ,  FastStream ,  apply_types 
from  faststream.kafka  import  KafkaBroker 
from  faststream.kafka.annotations  import  ContextRepo ,  KafkaMessage 
broker  =  KafkaBroker ( "localhost:9092" ) 
app  =  FastStream ( broker ) 
@broker . subscriber ( "test-topic" ) 
async  def  handle ( 
msg :  str , 
message :  KafkaMessage , 
context :  ContextRepo , 
): 
    with  context . scope ( "correlation_id" ,  message . correlation_id ): 
 call () 
@apply_types 
def  call ( 
    message :  KafkaMessage , 
     correlation_id = Context (), 
 ): 
assert  correlation_id  ==  message . correlation_id 
   from  faststream  import  Context ,  FastStream ,  apply_types 
from  faststream.confluent  import  KafkaBroker 
from  faststream.confluent.annotations  import  ContextRepo ,  KafkaMessage 
broker  =  KafkaBroker ( "localhost:9092" ) 
app  =  FastStream ( broker ) 
@broker . subscriber ( "test-topic" ) 
async  def  handle ( 
msg :  str , 
message :  KafkaMessage , 
context :  ContextRepo , 
): 
    with  context . scope ( "correlation_id" ,  message . correlation_id ): 
 call () 
@apply_types 
def  call ( 
    message :  KafkaMessage , 
     correlation_id = Context (), 
 ): 
assert  correlation_id  ==  message . correlation_id 
   from  faststream  import  Context ,  FastStream ,  apply_types 
from  faststream.rabbit  import  RabbitBroker 
from  faststream.rabbit.annotations  import  ContextRepo ,  RabbitMessage 
broker  =  RabbitBroker ( "amqp://guest:guest@localhost:5672/" ) 
app  =  FastStream ( broker ) 
@broker . subscriber ( "test-queue" ) 
async  def  handle ( 
msg :  str , 
message :  RabbitMessage , 
context :  ContextRepo , 
): 
    with  context . scope ( "correlation_id" ,  message . correlation_id ): 
 call () 
@apply_types 
def  call ( 
    message :  RabbitMessage , 
     correlation_id = Context (), 
 ): 
assert  correlation_id  ==  message . correlation_id 
   from  faststream  import  Context ,  FastStream ,  apply_types 
from  faststream.nats  import  NatsBroker 
from  faststream.nats.annotations  import  ContextRepo ,  NatsMessage 
broker  =  NatsBroker ( "nats://localhost:4222" ) 
app  =  FastStream ( broker ) 
@broker . subscriber ( "test-subject" ) 
async  def  handle ( 
msg :  str , 
message :  NatsMessage , 
context :  ContextRepo , 
): 
    with  context . scope ( "correlation_id" ,  message . correlation_id ): 
 call () 
@apply_types 
def  call ( 
    message :  NatsMessage , 
     correlation_id = Context (), 
 ): 
assert  correlation_id  ==  message . correlation_id 
   from  faststream  import  Context ,  FastStream ,  apply_types 
from  faststream.redis  import  RedisBroker 
from  faststream.redis.annotations  import  ContextRepo ,  RedisMessage 
broker  =  RedisBroker ( "redis://localhost:6379" ) 
app  =  FastStream ( broker ) 
@broker . subscriber ( "test-channel" ) 
async  def  handle ( 
msg :  str , 
message :  RedisMessage , 
context :  ContextRepo , 
): 
    with  context . scope ( "correlation_id" ,  message . correlation_id ): 
 call () 
@apply_types 
def  call ( 
    message :  RedisMessage , 
     correlation_id = Context (), 
 ): 
assert  correlation_id  ==  message . correlation_id 
    You can also set the context by yourself, and it will remain within the current call stack until you clear it.
 AIOKafka Confluent RabbitMQ NATS Redis 
   from  faststream  import  Context ,  FastStream ,  apply_types ,  context 
from  faststream.kafka  import  KafkaBroker 
from  faststream.kafka.annotations  import  KafkaMessage 
broker  =  KafkaBroker ( "localhost:9092" ) 
app  =  FastStream ( broker ) 
@broker . subscriber ( "test-topic" ) 
async  def  handle ( 
msg :  str , 
message :  KafkaMessage , 
): 
    tag  =  context . set_local ( "correlation_id" ,  message . correlation_id ) 
 call ( tag ) 
@apply_types 
def  call ( 
tag , 
message :  KafkaMessage , 
correlation_id = Context (), 
): 
assert  correlation_id  ==  message . correlation_id 
    context . reset_local ( "correlation_id" ,  tag ) 
 
   from  faststream  import  Context ,  FastStream ,  apply_types ,  context 
from  faststream.confluent  import  KafkaBroker 
from  faststream.confluent.annotations  import  KafkaMessage 
broker  =  KafkaBroker ( "localhost:9092" ) 
app  =  FastStream ( broker ) 
@broker . subscriber ( "test-topic" ) 
async  def  handle ( 
msg :  str , 
message :  KafkaMessage , 
): 
    tag  =  context . set_local ( "correlation_id" ,  message . correlation_id ) 
 call ( tag ) 
@apply_types 
def  call ( 
tag , 
message :  KafkaMessage , 
correlation_id = Context (), 
): 
assert  correlation_id  ==  message . correlation_id 
    context . reset_local ( "correlation_id" ,  tag ) 
 
   from  faststream  import  Context ,  FastStream ,  apply_types ,  context 
from  faststream.rabbit  import  RabbitBroker 
from  faststream.rabbit.annotations  import  RabbitMessage 
broker  =  RabbitBroker ( "amqp://guest:guest@localhost:5672/" ) 
app  =  FastStream ( broker ) 
@broker . subscriber ( "test-queue" ) 
async  def  handle ( 
msg :  str , 
message :  RabbitMessage , 
): 
    tag  =  context . set_local ( "correlation_id" ,  message . correlation_id ) 
 call ( tag ) 
@apply_types 
def  call ( 
tag , 
message :  RabbitMessage , 
correlation_id = Context (), 
): 
assert  correlation_id  ==  message . correlation_id 
    context . reset_local ( "correlation_id" ,  tag ) 
 
   from  faststream  import  Context ,  FastStream ,  apply_types ,  context 
from  faststream.nats  import  NatsBroker 
from  faststream.nats.annotations  import  NatsMessage 
broker  =  NatsBroker ( "nats://localhost:4222" ) 
app  =  FastStream ( broker ) 
@broker . subscriber ( "test-subject" ) 
async  def  handle ( 
msg :  str , 
message :  NatsMessage , 
): 
    tag  =  context . set_local ( "correlation_id" ,  message . correlation_id ) 
 call ( tag ) 
@apply_types 
def  call ( 
tag , 
message :  NatsMessage , 
correlation_id = Context (), 
): 
assert  correlation_id  ==  message . correlation_id 
    context . reset_local ( "correlation_id" ,  tag ) 
 
   from  faststream  import  Context ,  FastStream ,  apply_types ,  context 
from  faststream.redis  import  RedisBroker 
from  faststream.redis.annotations  import  RedisMessage 
broker  =  RedisBroker ( "redis://localhost:6379" ) 
app  =  FastStream ( broker ) 
@broker . subscriber ( "test-channel" ) 
async  def  handle ( 
msg :  str , 
message :  RedisMessage , 
): 
    tag  =  context . set_local ( "correlation_id" ,  message . correlation_id ) 
 call ( tag ) 
@apply_types 
def  call ( 
tag , 
message :  RedisMessage , 
correlation_id = Context (), 
): 
assert  correlation_id  ==  message . correlation_id 
    context . reset_local ( "correlation_id" ,  tag )