When a customer clicks “Buy Now,” that order must be processed—even if the inventory system is slow, the payment gateway times out, or a server crashes mid-transaction. E-commerce platforms, banks, and logistics companies solve this with message queues: systems that guarantee every transaction is processed, in order, exactly once.

This guide covers the patterns that power order processing, payment systems, and event-driven architectures at scale.

Why Message Brokers?

Direct service-to-service communication creates tight coupling:

  • Services must know each other’s locations
  • Failures cascade through the system
  • Scaling requires complex load balancing

Message brokers provide:

  • Decoupling: Services communicate through queues
  • Reliability: Messages persist until processed
  • Scalability: Add consumers to handle load
  • Buffering: Handle traffic spikes gracefully

Core Concepts

Producers, Exchanges, Queues, and Consumers

ProducerExchangeQueueConsumer
  • Producer: Sends messages to an exchange
  • Exchange: Routes messages to queues based on rules
  • Queue: Stores messages until consumed
  • Consumer: Receives and processes messages

Pattern 1: Direct Exchange (Point-to-Point)

Messages route to queues by exact routing key match:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import pika

# Producer
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare exchange
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# Send message with routing key
channel.basic_publish(
    exchange='direct_logs',
    routing_key='error',
    body='Error: Database connection failed'
)

print("Sent error log")
connection.close()

Consumer:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# Create queue and bind to exchange with routing key
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='error')

def callback(ch, method, properties, body):
    print(f"Received: {body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

Pattern 2: Fanout Exchange (Pub/Sub)

Messages broadcast to all bound queues:

1
2
3
4
5
6
7
8
# Publisher
channel.exchange_declare(exchange='notifications', exchange_type='fanout')

channel.basic_publish(
    exchange='notifications',
    routing_key='',  # Ignored for fanout
    body='New order placed: #12345'
)

Multiple services can subscribe:

1
2
3
4
5
6
7
8
# Email service
channel.queue_bind(exchange='notifications', queue='email_queue')

# SMS service  
channel.queue_bind(exchange='notifications', queue='sms_queue')

# Analytics service
channel.queue_bind(exchange='notifications', queue='analytics_queue')

Pattern 3: Topic Exchange (Pattern Matching)

Route messages based on pattern matching:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
channel.exchange_declare(exchange='logs', exchange_type='topic')

# Routing keys follow format: <facility>.<severity>
# Examples: auth.error, payment.warning, order.info

# Publish
channel.basic_publish(
    exchange='logs',
    routing_key='auth.error',
    body='Authentication failed for user 123'
)

channel.basic_publish(
    exchange='logs',
    routing_key='payment.info',
    body='Payment processed successfully'
)

Consumers can use wildcards:

1
2
3
4
5
6
7
8
# Receive all errors
channel.queue_bind(exchange='logs', queue=queue_name, routing_key='*.error')

# Receive everything from auth
channel.queue_bind(exchange='logs', queue=queue_name, routing_key='auth.*')

# Receive everything
channel.queue_bind(exchange='logs', queue=queue_name, routing_key='#')

Pattern 4: Work Queues (Load Balancing)

Distribute tasks among workers:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Task producer
channel.queue_declare(queue='tasks', durable=True)

for i in range(100):
    channel.basic_publish(
        exchange='',
        routing_key='tasks',
        body=f'Task {i}',
        properties=pika.BasicProperties(
            delivery_mode=2  # Persist message
        )
    )

Workers:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
channel.queue_declare(queue='tasks', durable=True)

# Fair dispatch - don't give more than 1 unacked message
channel.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    print(f"Processing: {body.decode()}")
    # Simulate work
    time.sleep(1)
    # Acknowledge completion
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()

Pattern 5: RPC (Request-Reply)

Synchronous-style communication over async transport:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import uuid

class RpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        # Create callback queue
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue
        
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )
        
        self.response = None
        self.corr_id = None
    
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
    
    def call(self, message):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=message
        )
        
        while self.response is None:
            self.connection.process_data_events()
        
        return self.response

# Usage
client = RpcClient()
response = client.call("calculate_fibonacci(30)")
print(f"Result: {response}")

RPC Server:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def on_request(ch, method, props, body):
    n = int(body)
    result = fibonacci(n)
    
    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(
            correlation_id=props.correlation_id
        ),
        body=str(result)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
channel.start_consuming()

Dead Letter Queues

Handle failed messages gracefully:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Main queue with dead letter exchange
channel.queue_declare(
    queue='orders',
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'failed_orders',
        'x-message-ttl': 60000  # Optional: expire after 60s
    }
)

# Dead letter exchange and queue
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='failed_orders')
channel.queue_bind(exchange='dlx', queue='failed_orders', routing_key='failed_orders')

Best Practices

1. Always Acknowledge Messages

1
2
3
4
5
6
7
def callback(ch, method, properties, body):
    try:
        process(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception:
        # Requeue or reject
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

2. Make Messages Persistent

1
2
3
4
properties = pika.BasicProperties(
    delivery_mode=2,  # Persistent
    content_type='application/json'
)

3. Use Publisher Confirms

1
2
3
4
5
6
channel.confirm_delivery()

try:
    channel.basic_publish(...)
except pika.exceptions.UnroutableError:
    print("Message could not be routed")

4. Monitor Queue Depth

1
rabbitmqctl list_queues name messages consumers

Conclusion

RabbitMQ patterns enable:

  • Decoupled architectures with pub/sub
  • Scalable processing with work queues
  • Reliable communication with acknowledgments
  • Flexible routing with topic exchanges

At Sajima Solutions, we design message-driven architectures that scale and remain resilient. Contact us to architect your distributed system.