As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Python design patterns provide essential solutions for building resilient distributed systems and microservices. As a Python architect, I've implemented these patterns across numerous projects and found them invaluable for creating maintainable, scalable systems. Let me share eight critical patterns that will transform your distributed architecture.
Circuit Breaker Pattern
The Circuit Breaker pattern prevents cascading failures in distributed systems. When a service becomes unavailable, the circuit breaker stops sending requests, allowing the failing component time to recover.
I've implemented this pattern numerous times, particularly when integrating with third-party APIs that occasionally experience downtime:
import time
import functools
from enum import Enum
class CircuitState(Enum):
CLOSED = 'closed'
OPEN = 'open'
HALF_OPEN = 'half_open'
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=30,
name="default"):
self.name = name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
print(f"Circuit {self.name} switching to HALF_OPEN")
else:
print(f"Circuit {self.name} is OPEN - fast failing")
raise CircuitBreakerError(f"Circuit {self.name} is open")
try:
result = func(*args, **kwargs)
# Success - reset if in half-open state
if self.state == CircuitState.HALF_OPEN:
self.reset()
print(f"Circuit {self.name} reset to CLOSED after success")
return result
except Exception as e:
# Handle failures
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.CLOSED and self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit {self.name} tripped to OPEN after {self.failure_count} failures")
raise e
return wrapper
def reset(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
self.last_failure_time = None
class CircuitBreakerError(Exception):
pass
This implementation includes state transitions (CLOSED, OPEN, HALF-OPEN) and automatic recovery attempts. I use this as a decorator for service functions:
@CircuitBreaker(failure_threshold=3, recovery_timeout=10, name="payment_service")
def process_payment(payment_data):
# External API call that might fail
response = requests.post("https://payment.example.com/process", json=payment_data)
response.raise_for_status()
return response.json()
Saga Pattern
The Saga pattern helps maintain data consistency across microservices without distributed transactions. Each step in a business process has a corresponding compensation action to undo changes if a step fails.
Here's my implementation of a saga coordinator:
import uuid
from enum import Enum
from typing import Dict, List, Callable, Any
class SagaStatus(Enum):
STARTED = "started"
COMPLETED = "completed"
FAILED = "failed"
class SagaStep:
def __init__(self, action, compensation):
self.action = action
self.compensation = compensation
self.executed = False
class Saga:
def __init__(self, name: str):
self.name = name
self.saga_id = str(uuid.uuid4())
self.steps: List[SagaStep] = []
self.context: Dict[str, Any] = {}
self.status = SagaStatus.STARTED
def add_step(self, action: Callable, compensation: Callable):
"""Add a step to the saga with its compensation action"""
self.steps.append(SagaStep(action, compensation))
return self
def execute(self):
"""Execute the saga with compensation on failure"""
executed_steps = []
try:
# Execute each step in sequence
for step in self.steps:
step.action(self.context)
step.executed = True
executed_steps.append(step)
self.status = SagaStatus.COMPLETED
return self.context
except Exception as e:
self.status = SagaStatus.FAILED
# Compensate in reverse order
for step in reversed(executed_steps):
try:
step.compensation(self.context)
except Exception as comp_error:
# Log compensation error but continue with other compensations
print(f"Compensation error in {self.name}: {comp_error}")
raise SagaExecutionError(f"Saga {self.name} failed: {str(e)}")
class SagaExecutionError(Exception):
pass
I've used this pattern for order processing workflows:
def create_order(context):
# Create order in order service
order_id = order_service.create_order(context["customer_id"], context["items"])
context["order_id"] = order_id
def delete_order(context):
# Compensation: delete the order
order_service.delete_order(context["order_id"])
def reserve_inventory(context):
# Reserve inventory items
inventory_service.reserve_items(context["items"])
def release_inventory(context):
# Compensation: release reserved inventory
inventory_service.release_items(context["items"])
def process_payment(context):
# Process payment
payment_id = payment_service.charge(context["customer_id"], context["total"])
context["payment_id"] = payment_id
def refund_payment(context):
# Compensation: refund payment
payment_service.refund(context["payment_id"])
# Create and execute the saga
order_saga = Saga("order_processing")
order_saga.add_step(create_order, delete_order)
order_saga.add_step(reserve_inventory, release_inventory)
order_saga.add_step(process_payment, refund_payment)
try:
result = order_saga.execute()
print(f"Order completed successfully: {result}")
except SagaExecutionError as e:
print(f"Order failed with compensation: {e}")
Bulkhead Pattern
The Bulkhead pattern isolates components to prevent failures from cascading through the system. I implement this using thread pools and resource limits:
import concurrent.futures
from dataclasses import dataclass
@dataclass
class Bulkhead:
name: str
max_concurrent_calls: int
max_queue_size: int
def __post_init__(self):
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_concurrent_calls,
thread_name_prefix=f"bulkhead-{self.name}"
)
self._semaphore = concurrent.futures.Semaphore(
self.max_concurrent_calls + self.max_queue_size
)
def execute(self, fn, *args, **kwargs):
if not self._semaphore.acquire(blocking=False):
raise BulkheadFullError(f"Bulkhead {self.name} is full")
try:
future = self._executor.submit(fn, *args, **kwargs)
future.add_done_callback(lambda _: self._semaphore.release())
return future
except Exception:
self._semaphore.release()
raise
def shutdown(self, wait=True):
self._executor.shutdown(wait=wait)
class BulkheadFullError(Exception):
pass
I apply this pattern to isolate critical services:
# Create bulkheads for different services
database_bulkhead = Bulkhead("database", max_concurrent_calls=10, max_queue_size=20)
payment_bulkhead = Bulkhead("payment", max_concurrent_calls=5, max_queue_size=10)
email_bulkhead = Bulkhead("email", max_concurrent_calls=20, max_queue_size=50)
# Use the bulkheads
try:
# Database operations with limited concurrency
future = database_bulkhead.execute(db_service.query, "SELECT * FROM users")
result = future.result(timeout=2.0) # Set timeout to avoid waiting forever
# Payment processing with isolation
payment_future = payment_bulkhead.execute(
payment_service.process_payment,
user_id=123,
amount=99.99
)
payment_result = payment_future.result(timeout=5.0)
except BulkheadFullError as e:
# Handle overload - perhaps retry later or return degraded response
print(f"Service overloaded: {e}")
except concurrent.futures.TimeoutError:
# Handle timeout
print("Operation timed out")
CQRS Pattern
The Command Query Responsibility Segregation pattern separates read and write operations. This allows for optimizing each pathway independently.
Here's a simplified implementation:
from abc import ABC, abstractmethod
from typing import Dict, List, Any
# Command side
class Command(ABC):
pass
class CommandHandler(ABC):
@abstractmethod
def handle(self, command: Command) -> None:
pass
class CreateOrderCommand(Command):
def __init__(self, order_id: str, customer_id: str, items: List[Dict]):
self.order_id = order_id
self.customer_id = customer_id
self.items = items
class CreateOrderHandler(CommandHandler):
def __init__(self, repository):
self.repository = repository
def handle(self, command: CreateOrderCommand) -> None:
# Business logic for creating an order
order = {
'order_id': command.order_id,
'customer_id': command.customer_id,
'items': command.items,
'status': 'created'
}
# Store in write model
self.repository.save(order)
# Publish event for read model sync
self.repository.publish_event('order_created', order)
# Query side
class Query(ABC):
pass
class QueryHandler(ABC):
@abstractmethod
def handle(self, query: Query) -> Any:
pass
class GetOrderQuery(Query):
def __init__(self, order_id: str):
self.order_id = order_id
class GetOrderHandler(QueryHandler):
def __init__(self, read_repository):
self.read_repository = read_repository
def handle(self, query: GetOrderQuery) -> Dict:
# Fetch from optimized read model
return self.read_repository.get_order(query.order_id)
# Command bus
class CommandBus:
def __init__(self):
self.handlers = {}
def register(self, command_type, handler):
self.handlers[command_type] = handler
def dispatch(self, command):
handler = self.handlers.get(type(command))
if not handler:
raise ValueError(f"No handler registered for {type(command)}")
return handler.handle(command)
# Query bus
class QueryBus:
def __init__(self):
self.handlers = {}
def register(self, query_type, handler):
self.handlers[query_type] = handler
def dispatch(self, query):
handler = self.handlers.get(type(query))
if not handler:
raise ValueError(f"No handler registered for {type(query)}")
return handler.handle(query)
In my projects, I use CQRS when reading and writing needs differ significantly:
# Setup
command_bus = CommandBus()
query_bus = QueryBus()
# Register command handlers
command_bus.register(CreateOrderCommand, CreateOrderHandler(write_repository))
# Register query handlers
query_bus.register(GetOrderQuery, GetOrderHandler(read_repository))
# Client code - Command side
def create_order_endpoint(data):
command = CreateOrderCommand(
order_id=str(uuid.uuid4()),
customer_id=data['customer_id'],
items=data['items']
)
command_bus.dispatch(command)
return {"status": "order created", "order_id": command.order_id}
# Client code - Query side
def get_order_endpoint(order_id):
query = GetOrderQuery(order_id)
result = query_bus.dispatch(query)
return result
API Gateway Pattern
The API Gateway pattern centralizes cross-cutting concerns and provides a single entry point to microservices. I've implemented lightweight gateways using FastAPI:
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.security import OAuth2PasswordBearer
import httpx
import jwt
import time
app = FastAPI(title="API Gateway")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Service registry (could be dynamic with service discovery)
SERVICES = {
"users": "http://user-service:8001",
"orders": "http://order-service:8002",
"products": "http://product-service:8003",
}
JWT_SECRET = "your-secret-key" # In production, use secure environment variables
# Authentication middleware
async def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token")
return {"user_id": user_id}
except jwt.PyJWTError:
raise HTTPException(status_code=401, detail="Invalid token")
# Rate limiting middleware
class RateLimiter:
def __init__(self, requests_per_minute=60):
self.requests_per_minute = requests_per_minute
self.requests = {}
async def __call__(self, request: Request):
client_ip = request.client.host
current_time = time.time()
# Clean old entries
self.requests = {ip: times for ip, times in self.requests.items()
if current_time - times[-1] < 60}
# Get client's request times
request_times = self.requests.get(client_ip, [])
# Remove requests older than 1 minute
request_times = [t for t in request_times if current_time - t < 60]
if len(request_times) >= self.requests_per_minute:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Add current request
request_times.append(current_time)
self.requests[client_ip] = request_times
rate_limiter = RateLimiter(requests_per_minute=100)
# Service proxy with timeout
async def proxy_to_service(service: str, path: str, request: Request):
if service not in SERVICES:
raise HTTPException(status_code=404, detail=f"Service {service} not found")
# Get target URL
target_url = f"{SERVICES[service]}{path}"
# Forward method, headers, and body
method = request.method
headers = dict(request.headers)
headers.pop("host", None) # Remove host header
body = await request.body()
# Use httpx for async HTTP requests
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.request(
method=method,
url=target_url,
headers=headers,
content=body,
)
return {
"status_code": response.status_code,
"content": response.json() if response.headers.get("content-type") == "application/json" else response.text,
"headers": dict(response.headers)
}
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Service timeout")
except Exception as e:
raise HTTPException(status_code=502, detail=f"Error: {str(e)}")
# API Gateway routes
@app.get("/")
async def root():
return {"message": "API Gateway"}
@app.get("/health")
async def health():
return {"status": "healthy", "timestamp": time.time()}
# Public routes (no auth)
@app.get("/public/{service}{path:path}")
async def public_route(service: str, path: str, request: Request):
await rate_limiter(request)
return await proxy_to_service(service, path, request)
# Protected routes (with auth)
@app.get("/api/{service}{path:path}")
async def protected_route(
service: str,
path: str,
request: Request,
user: dict = Depends(get_current_user)
):
await rate_limiter(request)
return await proxy_to_service(service, path, request)
Event Sourcing Pattern
The Event Sourcing pattern stores state changes as a sequence of events. This enables powerful event-driven communication between services.
Here's my implementation:
import json
import time
import uuid
from typing import Dict, List, Any, Optional, Callable
class Event:
def __init__(self, event_type: str, data: Dict[str, Any],
aggregate_id: str, version: int):
self.id = str(uuid.uuid4())
self.event_type = event_type
self.data = data
self.aggregate_id = aggregate_id
self.version = version
self.timestamp = time.time()
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"event_type": self.event_type,
"data": self.data,
"aggregate_id": self.aggregate_id,
"version": self.version,
"timestamp": self.timestamp
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Event":
event = cls(
event_type=data["event_type"],
data=data["data"],
aggregate_id=data["aggregate_id"],
version=data["version"]
)
event.id = data["id"]
event.timestamp = data["timestamp"]
return event
class EventStore:
def __init__(self):
self.events: Dict[str, List[Event]] = {}
self.subscribers: Dict[str, List[Callable]] = {}
def save_event(self, event: Event) -> None:
"""Save an event to the store and publish it to subscribers"""
if event.aggregate_id not in self.events:
self.events[event.aggregate_id] = []
# Check for version conflicts
events = self.events[event.aggregate_id]
if events and events[-1].version >= event.version:
raise ValueError(f"Version conflict for {event.aggregate_id}")
# Store the event
self.events[event.aggregate_id].append(event)
# Publish the event
self._publish(event)
def get_events(self, aggregate_id: str,
since_version: int = 0) -> List[Event]:
"""Get all events for an aggregate ID since a specific version"""
events = self.events.get(aggregate_id, [])
return [e for e in events if e.version > since_version]
def subscribe(self, event_type: str, callback: Callable[[Event], None]) -> None:
"""Subscribe to events of a specific type"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(callback)
def _publish(self, event: Event) -> None:
"""Publish event to subscribers"""
subscribers = self.subscribers.get(event.event_type, [])
for subscriber in subscribers:
try:
subscriber(event)
except Exception as e:
# In production, log this error but don't stop processing
print(f"Error in subscriber: {e}")
class Aggregate:
"""Base class for aggregates that use event sourcing"""
def __init__(self, id: str, event_store: EventStore):
self.id = id
self.version = 0
self._event_store = event_store
self._changes: List[Event] = []
def load_from_history(self) -> None:
"""Load the aggregate state from its event history"""
events = self._event_store.get_events(self.id)
for event in events:
self._apply_event(event)
self.version = event.version
def apply_event(self, event_type: str, data: Dict[str, Any]) -> None:
"""Apply a new event to the aggregate"""
event = Event(
event_type=event_type,
data=data,
aggregate_id=self.id,
version=self.version + 1
)
# Apply event to update current state
self._apply_event(event)
# Track the change to be committed
self._changes.append(event)
self.version += 1
def commit(self) -> None:
"""Commit all pending changes to the event store"""
for event in self._changes:
self._event_store.save_event(event)
self._changes = []
def _apply_event(self, event: Event) -> None:
"""Apply an event to the aggregate - override in subclasses"""
method_name = f"apply_{event.event_type}"
method = getattr(self, method_name, None)
if method:
method(event.data)
Here's an order management example using event sourcing:
class Order(Aggregate):
def __init__(self, id: str, event_store: EventStore):
super().__init__(id, event_store)
self.status = "new"
self.items = []
self.customer_id = None
self.total = 0
def create(self, customer_id: str) -> None:
self.apply_event("order_created", {
"customer_id": customer_id
})
def add_item(self, product_id: str, quantity: int, price: float) -> None:
if self.status != "new":
raise ValueError("Cannot add items to non-new order")
self.apply_event("item_added", {
"product_id": product_id,
"quantity": quantity,
"price": price
})
def submit(self) -> None:
if not self.items:
raise ValueError("Cannot submit empty order")
self.apply_event("order_submitted", {})
def apply_order_created(self, data: Dict[str, Any]) -> None:
self.customer_id = data["customer_id"]
self.status = "new"
def apply_item_added(self, data: Dict[str, Any]) -> None:
item = {
"product_id": data["product_id"],
"quantity": data["quantity"],
"price": data["price"]
}
self.items.append(item)
self.total += data["quantity"] * data["price"]
def apply_order_submitted(self, data: Dict[str, Any]) -> None:
self.status = "submitted"
# Using the event sourcing system
event_store = EventStore()
# Create a new order
order = Order("order-123", event_store)
order.create("customer-456")
order.add_item("product-789", 2, 10.99)
order.add_item("product-101", 1, 24.99)
order.submit()
order.commit()
# Later, reconstruct the order state
loaded_order = Order("order-123", event_store)
loaded_order.load_from_history()
print(f"Order total: ${loaded_order.total}")
print(f"Order status: {loaded_order.status}")
print(f"Items count: {len(loaded_order.items)}")
Health Check Pattern
The Health Check pattern implements monitoring endpoints to report service status:
import time
import asyncio
import threading
from enum import Enum
from typing import Dict, List, Callable, Any, Optional
from dataclasses import dataclass, field
class HealthStatus(Enum):
UP = "UP"
DOWN = "DOWN"
DEGRADED = "DEGRADED"
UNKNOWN = "UNKNOWN"
@dataclass
class HealthCheck:
name: str
check_function: Callable[[], bool]
timeout: float = 5.0
importance: str = "critical" # critical, high, medium, low
@dataclass
class DependencyHealth:
name: str
status: HealthStatus = HealthStatus.UNKNOWN
last_checked: float = 0
details: Dict[str, Any] = field(default_factory=dict)
class HealthMonitor:
def __init__(self, service_name: str, version: str):
self.service_name = service_name
self.version = version
self.checks: List[HealthCheck] = []
self.dependencies: Dict[str, DependencyHealth] = {}
self.startup_time = time.time()
self._lock = threading.Lock()
def register_check(self, name: str, check_function: Callable[[], bool],
timeout: float = 5.0, importance: str = "critical") -> None:
"""Register a new health check"""
check = HealthCheck(name, check_function, timeout, importance)
self.checks.append(check)
def register_dependency(self, name: str) -> None:
"""Register a dependency to monitor"""
with self._lock:
self.dependencies[name] = DependencyHealth(name)
def update_dependency(self, name: str, status: HealthStatus,
details: Optional[Dict[str, Any]] = None) -> None:
"""Update the health status of a dependency"""
with self._lock:
if name not in self.dependencies:
self.register_dependency(name)
dep = self.dependencies[name]
dep.status = status
dep.last_checked = time.time()
if details:
dep.details = details
async def run_checks(self) -> Dict[str, Any]:
"""Run all registered health checks"""
results = {}
for check in self.checks:
try:
# Run check with timeout
loop = asyncio.get_event_loop()
result = await asyncio.wait_for(
loop.run_in_executor(None, check.check_function),
timeout=check.timeout
)
results[check.name] = {
"status": HealthStatus.UP.value if result else HealthStatus.DOWN.value,
"importance": check.importance
}
except asyncio.TimeoutError:
results[check.name] = {
"status": HealthStatus.DOWN.value,
"importance": check.importance,
"error": "Timeout"
}
except Exception as e:
results[check.name] = {
"status": HealthStatus.DOWN.value,
"importance": check.importance,
"error": str(e)
}
return results
async def get_health(self) -> Dict[str, Any]:
"""Get complete health information"""
check_results = await self.run_checks()
# Determine overall status
status = HealthStatus.UP
# Check for critical failures
for name, result in check_results.items():
if (result["importance"] == "critical" and
result["status"] == HealthStatus.DOWN.value):
status = HealthStatus.DOWN
break
# Check for high importance degradation
if status != HealthStatus.DOWN:
for name, result in check_results.items():
if (result["importance"] in ["critical", "high"] and
result["status"] != HealthStatus.UP.value):
status = HealthStatus.DEGRADED
break
# Build health response
with self._lock:
deps = {name: {"status": dep.status.value, "details": dep.details}
for name, dep in self.dependencies.items()}
return {
"status": status.value,
"service": self.service_name,
"version": self.version,
"uptime": time.time() - self.startup_time,
"checks": check_results,
"dependencies": deps
}
To use this with FastAPI:
from fastapi import FastAPI, HTTPException
app = FastAPI()
health_monitor = HealthMonitor("order-service", "1.0.0")
# Register health checks
health_monitor.register_check(
"database",
lambda: db_pool.is_connected(),
importance="critical"
)
health_monitor.register_check(
"disk_space",
lambda: check_disk_space() > 100_000_000, # 100MB free
importance="high"
)
# Update dependency status
health_monitor.update_dependency(
"payment-service",
HealthStatus.UP,
{"endpoint": "https://payment.example.com/api"}
)
@app.get("/health")
async def health():
"""Basic health endpoint for load balancers"""
health_info = await health_monitor.get_health()
if health_info["status"] == HealthStatus.DOWN.value:
raise HTTPException(status_code=503, detail="Service Unavailable")
return {"status": "UP"}
@app.get("/health/details")
async def health_details():
"""Detailed health information for monitoring systems"""
return await health_monitor.get_health()
Sidecar Pattern
The Sidecar pattern deploys helper services alongside main services to handle cross-cutting concerns:
python
import os
import time
import signal
import subprocess
import threading
from typing import List, Dict, Any, Optional
import logging
import json
class SidecarProcess:
def __init__(self, name: str, command: List[str], env: Optional[Dict[str, str]] = None):
self.name = name
self.command = command
self.env = env or {}
self.process = None
self.stopped = False
self.logger = logging.getLogger(f"sidecar.{name}")
def start(self) -> bool:
"""Start the sidecar process"""
try:
# Merge current environment with sidecar-specific env
process_
---
## 101 Books
**101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone.
Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!
## Our Creations
Be sure to check out our creations:
**[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)**
---
### We are on Medium
**[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**
Top comments (0)