When a customer clicks “Buy Now,” that order must be processed—even if the inventory system is slow, the payment gateway times out, or a server crashes mid-transaction. E-commerce platforms, banks, and logistics companies solve this with message queues : systems that guarantee every transaction is processed, in order, exactly once.
This guide covers the patterns that power order processing, payment systems, and event-driven architectures at scale.
Why Message Brokers? Direct service-to-service communication creates tight coupling:
Services must know each other’s locations Failures cascade through the system Scaling requires complex load balancing Message brokers provide:
Decoupling : Services communicate through queuesReliability : Messages persist until processedScalability : Add consumers to handle loadBuffering : Handle traffic spikes gracefullyCore Concepts Producers, Exchanges, Queues, and Consumers P r o d u c e r → E x c h a n g e → Q u e u e → C o n s u m e r
Producer : Sends messages to an exchangeExchange : Routes messages to queues based on rulesQueue : Stores messages until consumedConsumer : Receives and processes messagesPattern 1: Direct Exchange (Point-to-Point) Messages route to queues by exact routing key match:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pika
# Producer
connection = pika. BlockingConnection(pika. ConnectionParameters('localhost' ))
channel = connection. channel()
# Declare exchange
channel. exchange_declare(exchange= 'direct_logs' , exchange_type= 'direct' )
# Send message with routing key
channel. basic_publish(
exchange= 'direct_logs' ,
routing_key= 'error' ,
body= 'Error: Database connection failed'
)
print ("Sent error log" )
connection. close()
Consumer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pika
connection = pika. BlockingConnection(pika. ConnectionParameters('localhost' ))
channel = connection. channel()
channel. exchange_declare(exchange= 'direct_logs' , exchange_type= 'direct' )
# Create queue and bind to exchange with routing key
result = channel. queue_declare(queue= '' , exclusive= True )
queue_name = result. method. queue
channel. queue_bind(exchange= 'direct_logs' , queue= queue_name, routing_key= 'error' )
def callback (ch, method, properties, body):
print (f "Received: { body. decode()} " )
channel. basic_consume(queue= queue_name, on_message_callback= callback, auto_ack= True )
channel. start_consuming()
Pattern 2: Fanout Exchange (Pub/Sub) Messages broadcast to all bound queues:
1
2
3
4
5
6
7
8
# Publisher
channel. exchange_declare(exchange= 'notifications' , exchange_type= 'fanout' )
channel. basic_publish(
exchange= 'notifications' ,
routing_key= '' , # Ignored for fanout
body= 'New order placed: #12345'
)
Multiple services can subscribe:
1
2
3
4
5
6
7
8
# Email service
channel. queue_bind(exchange= 'notifications' , queue= 'email_queue' )
# SMS service
channel. queue_bind(exchange= 'notifications' , queue= 'sms_queue' )
# Analytics service
channel. queue_bind(exchange= 'notifications' , queue= 'analytics_queue' )
Pattern 3: Topic Exchange (Pattern Matching) Route messages based on pattern matching:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
channel. exchange_declare(exchange= 'logs' , exchange_type= 'topic' )
# Routing keys follow format: <facility>.<severity>
# Examples: auth.error, payment.warning, order.info
# Publish
channel. basic_publish(
exchange= 'logs' ,
routing_key= 'auth.error' ,
body= 'Authentication failed for user 123'
)
channel. basic_publish(
exchange= 'logs' ,
routing_key= 'payment.info' ,
body= 'Payment processed successfully'
)
Consumers can use wildcards:
1
2
3
4
5
6
7
8
# Receive all errors
channel. queue_bind(exchange= 'logs' , queue= queue_name, routing_key= '*.error' )
# Receive everything from auth
channel. queue_bind(exchange= 'logs' , queue= queue_name, routing_key= 'auth.*' )
# Receive everything
channel. queue_bind(exchange= 'logs' , queue= queue_name, routing_key= '#' )
Pattern 4: Work Queues (Load Balancing) Distribute tasks among workers:
1
2
3
4
5
6
7
8
9
10
11
12
# Task producer
channel. queue_declare(queue= 'tasks' , durable= True )
for i in range (100 ):
channel. basic_publish(
exchange= '' ,
routing_key= 'tasks' ,
body= f 'Task { i} ' ,
properties= pika. BasicProperties(
delivery_mode= 2 # Persist message
)
)
Workers:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
channel. queue_declare(queue= 'tasks' , durable= True )
# Fair dispatch - don't give more than 1 unacked message
channel. basic_qos(prefetch_count= 1 )
def callback (ch, method, properties, body):
print (f "Processing: { body. decode()} " )
# Simulate work
time. sleep(1 )
# Acknowledge completion
ch. basic_ack(delivery_tag= method. delivery_tag)
channel. basic_consume(queue= 'tasks' , on_message_callback= callback)
channel. start_consuming()
Pattern 5: RPC (Request-Reply) Synchronous-style communication over async transport:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import uuid
class RpcClient :
def __init__(self):
self. connection = pika. BlockingConnection(
pika. ConnectionParameters('localhost' )
)
self. channel = self. connection. channel()
# Create callback queue
result = self. channel. queue_declare(queue= '' , exclusive= True )
self. callback_queue = result. method. queue
self. channel. basic_consume(
queue= self. callback_queue,
on_message_callback= self. on_response,
auto_ack= True
)
self. response = None
self. corr_id = None
def on_response (self, ch, method, props, body):
if self. corr_id == props. correlation_id:
self. response = body
def call (self, message):
self. response = None
self. corr_id = str (uuid. uuid4())
self. channel. basic_publish(
exchange= '' ,
routing_key= 'rpc_queue' ,
properties= pika. BasicProperties(
reply_to= self. callback_queue,
correlation_id= self. corr_id,
),
body= message
)
while self. response is None :
self. connection. process_data_events()
return self. response
# Usage
client = RpcClient()
response = client. call("calculate_fibonacci(30)" )
print (f "Result: { response} " )
RPC Server:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def on_request (ch, method, props, body):
n = int (body)
result = fibonacci(n)
ch. basic_publish(
exchange= '' ,
routing_key= props. reply_to,
properties= pika. BasicProperties(
correlation_id= props. correlation_id
),
body= str (result)
)
ch. basic_ack(delivery_tag= method. delivery_tag)
channel. basic_consume(queue= 'rpc_queue' , on_message_callback= on_request)
channel. start_consuming()
Dead Letter Queues Handle failed messages gracefully:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Main queue with dead letter exchange
channel. queue_declare(
queue= 'orders' ,
arguments= {
'x-dead-letter-exchange' : 'dlx' ,
'x-dead-letter-routing-key' : 'failed_orders' ,
'x-message-ttl' : 60000 # Optional: expire after 60s
}
)
# Dead letter exchange and queue
channel. exchange_declare(exchange= 'dlx' , exchange_type= 'direct' )
channel. queue_declare(queue= 'failed_orders' )
channel. queue_bind(exchange= 'dlx' , queue= 'failed_orders' , routing_key= 'failed_orders' )
Best Practices 1. Always Acknowledge Messages 1
2
3
4
5
6
7
def callback (ch, method, properties, body):
try :
process(body)
ch. basic_ack(delivery_tag= method. delivery_tag)
except Exception:
# Requeue or reject
ch. basic_nack(delivery_tag= method. delivery_tag, requeue= False )
2. Make Messages Persistent 1
2
3
4
properties = pika. BasicProperties(
delivery_mode= 2 , # Persistent
content_type= 'application/json'
)
3. Use Publisher Confirms 1
2
3
4
5
6
channel. confirm_delivery()
try :
channel. basic_publish(... )
except pika. exceptions. UnroutableError:
print ("Message could not be routed" )
4. Monitor Queue Depth 1
rabbitmqctl list_queues name messages consumers
Conclusion RabbitMQ patterns enable:
Decoupled architectures with pub/subScalable processing with work queuesReliable communication with acknowledgmentsFlexible routing with topic exchangesAt Sajima Solutions, we design message-driven architectures that scale and remain resilient. Contact us to architect your distributed system.