DEV Community

Cover image for 8 Python Design Patterns for Resilient Distributed Systems and Microservices
Aarav Joshi
Aarav Joshi

Posted on

8 Python Design Patterns for Resilient Distributed Systems and Microservices

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Python design patterns provide essential solutions for building resilient distributed systems and microservices. As a Python architect, I've implemented these patterns across numerous projects and found them invaluable for creating maintainable, scalable systems. Let me share eight critical patterns that will transform your distributed architecture.

Circuit Breaker Pattern

The Circuit Breaker pattern prevents cascading failures in distributed systems. When a service becomes unavailable, the circuit breaker stops sending requests, allowing the failing component time to recover.

I've implemented this pattern numerous times, particularly when integrating with third-party APIs that occasionally experience downtime:

import time
import functools
from enum import Enum

class CircuitState(Enum):
    CLOSED = 'closed'
    OPEN = 'open'
    HALF_OPEN = 'half_open'

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=30, 
                 name="default"):
        self.name = name
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None

    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            if self.state == CircuitState.OPEN:
                if time.time() - self.last_failure_time > self.recovery_timeout:
                    self.state = CircuitState.HALF_OPEN
                    print(f"Circuit {self.name} switching to HALF_OPEN")
                else:
                    print(f"Circuit {self.name} is OPEN - fast failing")
                    raise CircuitBreakerError(f"Circuit {self.name} is open")

            try:
                result = func(*args, **kwargs)

                # Success - reset if in half-open state
                if self.state == CircuitState.HALF_OPEN:
                    self.reset()
                    print(f"Circuit {self.name} reset to CLOSED after success")

                return result

            except Exception as e:
                # Handle failures
                self.failure_count += 1
                self.last_failure_time = time.time()

                if self.state == CircuitState.CLOSED and self.failure_count >= self.failure_threshold:
                    self.state = CircuitState.OPEN
                    print(f"Circuit {self.name} tripped to OPEN after {self.failure_count} failures")

                raise e

        return wrapper

    def reset(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED
        self.last_failure_time = None

class CircuitBreakerError(Exception):
    pass
Enter fullscreen mode Exit fullscreen mode

This implementation includes state transitions (CLOSED, OPEN, HALF-OPEN) and automatic recovery attempts. I use this as a decorator for service functions:

@CircuitBreaker(failure_threshold=3, recovery_timeout=10, name="payment_service")
def process_payment(payment_data):
    # External API call that might fail
    response = requests.post("https://payment.example.com/process", json=payment_data)
    response.raise_for_status()
    return response.json()
Enter fullscreen mode Exit fullscreen mode

Saga Pattern

The Saga pattern helps maintain data consistency across microservices without distributed transactions. Each step in a business process has a corresponding compensation action to undo changes if a step fails.

Here's my implementation of a saga coordinator:

import uuid
from enum import Enum
from typing import Dict, List, Callable, Any

class SagaStatus(Enum):
    STARTED = "started"
    COMPLETED = "completed"
    FAILED = "failed"

class SagaStep:
    def __init__(self, action, compensation):
        self.action = action
        self.compensation = compensation
        self.executed = False

class Saga:
    def __init__(self, name: str):
        self.name = name
        self.saga_id = str(uuid.uuid4())
        self.steps: List[SagaStep] = []
        self.context: Dict[str, Any] = {}
        self.status = SagaStatus.STARTED

    def add_step(self, action: Callable, compensation: Callable):
        """Add a step to the saga with its compensation action"""
        self.steps.append(SagaStep(action, compensation))
        return self

    def execute(self):
        """Execute the saga with compensation on failure"""
        executed_steps = []

        try:
            # Execute each step in sequence
            for step in self.steps:
                step.action(self.context)
                step.executed = True
                executed_steps.append(step)

            self.status = SagaStatus.COMPLETED
            return self.context

        except Exception as e:
            self.status = SagaStatus.FAILED

            # Compensate in reverse order
            for step in reversed(executed_steps):
                try:
                    step.compensation(self.context)
                except Exception as comp_error:
                    # Log compensation error but continue with other compensations
                    print(f"Compensation error in {self.name}: {comp_error}")

            raise SagaExecutionError(f"Saga {self.name} failed: {str(e)}")

class SagaExecutionError(Exception):
    pass
Enter fullscreen mode Exit fullscreen mode

I've used this pattern for order processing workflows:

def create_order(context):
    # Create order in order service
    order_id = order_service.create_order(context["customer_id"], context["items"])
    context["order_id"] = order_id

def delete_order(context):
    # Compensation: delete the order
    order_service.delete_order(context["order_id"])

def reserve_inventory(context):
    # Reserve inventory items
    inventory_service.reserve_items(context["items"])

def release_inventory(context):
    # Compensation: release reserved inventory
    inventory_service.release_items(context["items"])

def process_payment(context):
    # Process payment
    payment_id = payment_service.charge(context["customer_id"], context["total"])
    context["payment_id"] = payment_id

def refund_payment(context):
    # Compensation: refund payment
    payment_service.refund(context["payment_id"])

# Create and execute the saga
order_saga = Saga("order_processing")
order_saga.add_step(create_order, delete_order)
order_saga.add_step(reserve_inventory, release_inventory)
order_saga.add_step(process_payment, refund_payment)

try:
    result = order_saga.execute()
    print(f"Order completed successfully: {result}")
except SagaExecutionError as e:
    print(f"Order failed with compensation: {e}")
Enter fullscreen mode Exit fullscreen mode

Bulkhead Pattern

The Bulkhead pattern isolates components to prevent failures from cascading through the system. I implement this using thread pools and resource limits:

import concurrent.futures
from dataclasses import dataclass

@dataclass
class Bulkhead:
    name: str
    max_concurrent_calls: int
    max_queue_size: int

    def __post_init__(self):
        self._executor = concurrent.futures.ThreadPoolExecutor(
            max_workers=self.max_concurrent_calls,
            thread_name_prefix=f"bulkhead-{self.name}"
        )
        self._semaphore = concurrent.futures.Semaphore(
            self.max_concurrent_calls + self.max_queue_size
        )

    def execute(self, fn, *args, **kwargs):
        if not self._semaphore.acquire(blocking=False):
            raise BulkheadFullError(f"Bulkhead {self.name} is full")

        try:
            future = self._executor.submit(fn, *args, **kwargs)
            future.add_done_callback(lambda _: self._semaphore.release())
            return future
        except Exception:
            self._semaphore.release()
            raise

    def shutdown(self, wait=True):
        self._executor.shutdown(wait=wait)

class BulkheadFullError(Exception):
    pass
Enter fullscreen mode Exit fullscreen mode

I apply this pattern to isolate critical services:

# Create bulkheads for different services
database_bulkhead = Bulkhead("database", max_concurrent_calls=10, max_queue_size=20)
payment_bulkhead = Bulkhead("payment", max_concurrent_calls=5, max_queue_size=10)
email_bulkhead = Bulkhead("email", max_concurrent_calls=20, max_queue_size=50)

# Use the bulkheads
try:
    # Database operations with limited concurrency
    future = database_bulkhead.execute(db_service.query, "SELECT * FROM users")
    result = future.result(timeout=2.0)  # Set timeout to avoid waiting forever

    # Payment processing with isolation
    payment_future = payment_bulkhead.execute(
        payment_service.process_payment, 
        user_id=123, 
        amount=99.99
    )
    payment_result = payment_future.result(timeout=5.0)

except BulkheadFullError as e:
    # Handle overload - perhaps retry later or return degraded response
    print(f"Service overloaded: {e}")
except concurrent.futures.TimeoutError:
    # Handle timeout
    print("Operation timed out")
Enter fullscreen mode Exit fullscreen mode

CQRS Pattern

The Command Query Responsibility Segregation pattern separates read and write operations. This allows for optimizing each pathway independently.

Here's a simplified implementation:

from abc import ABC, abstractmethod
from typing import Dict, List, Any

# Command side
class Command(ABC):
    pass

class CommandHandler(ABC):
    @abstractmethod
    def handle(self, command: Command) -> None:
        pass

class CreateOrderCommand(Command):
    def __init__(self, order_id: str, customer_id: str, items: List[Dict]):
        self.order_id = order_id
        self.customer_id = customer_id
        self.items = items

class CreateOrderHandler(CommandHandler):
    def __init__(self, repository):
        self.repository = repository

    def handle(self, command: CreateOrderCommand) -> None:
        # Business logic for creating an order
        order = {
            'order_id': command.order_id,
            'customer_id': command.customer_id,
            'items': command.items,
            'status': 'created'
        }
        # Store in write model
        self.repository.save(order)
        # Publish event for read model sync
        self.repository.publish_event('order_created', order)

# Query side
class Query(ABC):
    pass

class QueryHandler(ABC):
    @abstractmethod
    def handle(self, query: Query) -> Any:
        pass

class GetOrderQuery(Query):
    def __init__(self, order_id: str):
        self.order_id = order_id

class GetOrderHandler(QueryHandler):
    def __init__(self, read_repository):
        self.read_repository = read_repository

    def handle(self, query: GetOrderQuery) -> Dict:
        # Fetch from optimized read model
        return self.read_repository.get_order(query.order_id)

# Command bus
class CommandBus:
    def __init__(self):
        self.handlers = {}

    def register(self, command_type, handler):
        self.handlers[command_type] = handler

    def dispatch(self, command):
        handler = self.handlers.get(type(command))
        if not handler:
            raise ValueError(f"No handler registered for {type(command)}")
        return handler.handle(command)

# Query bus
class QueryBus:
    def __init__(self):
        self.handlers = {}

    def register(self, query_type, handler):
        self.handlers[query_type] = handler

    def dispatch(self, query):
        handler = self.handlers.get(type(query))
        if not handler:
            raise ValueError(f"No handler registered for {type(query)}")
        return handler.handle(query)
Enter fullscreen mode Exit fullscreen mode

In my projects, I use CQRS when reading and writing needs differ significantly:

# Setup
command_bus = CommandBus()
query_bus = QueryBus()

# Register command handlers
command_bus.register(CreateOrderCommand, CreateOrderHandler(write_repository))

# Register query handlers
query_bus.register(GetOrderQuery, GetOrderHandler(read_repository))

# Client code - Command side
def create_order_endpoint(data):
    command = CreateOrderCommand(
        order_id=str(uuid.uuid4()),
        customer_id=data['customer_id'],
        items=data['items']
    )
    command_bus.dispatch(command)
    return {"status": "order created", "order_id": command.order_id}

# Client code - Query side
def get_order_endpoint(order_id):
    query = GetOrderQuery(order_id)
    result = query_bus.dispatch(query)
    return result
Enter fullscreen mode Exit fullscreen mode

API Gateway Pattern

The API Gateway pattern centralizes cross-cutting concerns and provides a single entry point to microservices. I've implemented lightweight gateways using FastAPI:

from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.security import OAuth2PasswordBearer
import httpx
import jwt
import time

app = FastAPI(title="API Gateway")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Service registry (could be dynamic with service discovery)
SERVICES = {
    "users": "http://user-service:8001",
    "orders": "http://order-service:8002",
    "products": "http://product-service:8003",
}

JWT_SECRET = "your-secret-key"  # In production, use secure environment variables

# Authentication middleware
async def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
        user_id = payload.get("sub")
        if not user_id:
            raise HTTPException(status_code=401, detail="Invalid token")
        return {"user_id": user_id}
    except jwt.PyJWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

# Rate limiting middleware
class RateLimiter:
    def __init__(self, requests_per_minute=60):
        self.requests_per_minute = requests_per_minute
        self.requests = {}

    async def __call__(self, request: Request):
        client_ip = request.client.host
        current_time = time.time()

        # Clean old entries
        self.requests = {ip: times for ip, times in self.requests.items() 
                        if current_time - times[-1] < 60}

        # Get client's request times
        request_times = self.requests.get(client_ip, [])

        # Remove requests older than 1 minute
        request_times = [t for t in request_times if current_time - t < 60]

        if len(request_times) >= self.requests_per_minute:
            raise HTTPException(status_code=429, detail="Rate limit exceeded")

        # Add current request
        request_times.append(current_time)
        self.requests[client_ip] = request_times

rate_limiter = RateLimiter(requests_per_minute=100)

# Service proxy with timeout
async def proxy_to_service(service: str, path: str, request: Request):
    if service not in SERVICES:
        raise HTTPException(status_code=404, detail=f"Service {service} not found")

    # Get target URL
    target_url = f"{SERVICES[service]}{path}"

    # Forward method, headers, and body
    method = request.method
    headers = dict(request.headers)
    headers.pop("host", None)  # Remove host header

    body = await request.body()

    # Use httpx for async HTTP requests
    async with httpx.AsyncClient(timeout=10.0) as client:
        try:
            response = await client.request(
                method=method,
                url=target_url,
                headers=headers,
                content=body,
            )
            return {
                "status_code": response.status_code,
                "content": response.json() if response.headers.get("content-type") == "application/json" else response.text,
                "headers": dict(response.headers)
            }
        except httpx.TimeoutException:
            raise HTTPException(status_code=504, detail="Service timeout")
        except Exception as e:
            raise HTTPException(status_code=502, detail=f"Error: {str(e)}")

# API Gateway routes
@app.get("/")
async def root():
    return {"message": "API Gateway"}

@app.get("/health")
async def health():
    return {"status": "healthy", "timestamp": time.time()}

# Public routes (no auth)
@app.get("/public/{service}{path:path}")
async def public_route(service: str, path: str, request: Request):
    await rate_limiter(request)
    return await proxy_to_service(service, path, request)

# Protected routes (with auth)
@app.get("/api/{service}{path:path}")
async def protected_route(
    service: str, 
    path: str, 
    request: Request,
    user: dict = Depends(get_current_user)
):
    await rate_limiter(request)
    return await proxy_to_service(service, path, request)
Enter fullscreen mode Exit fullscreen mode

Event Sourcing Pattern

The Event Sourcing pattern stores state changes as a sequence of events. This enables powerful event-driven communication between services.

Here's my implementation:

import json
import time
import uuid
from typing import Dict, List, Any, Optional, Callable

class Event:
    def __init__(self, event_type: str, data: Dict[str, Any], 
                 aggregate_id: str, version: int):
        self.id = str(uuid.uuid4())
        self.event_type = event_type
        self.data = data
        self.aggregate_id = aggregate_id
        self.version = version
        self.timestamp = time.time()

    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.id,
            "event_type": self.event_type,
            "data": self.data,
            "aggregate_id": self.aggregate_id,
            "version": self.version,
            "timestamp": self.timestamp
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "Event":
        event = cls(
            event_type=data["event_type"],
            data=data["data"],
            aggregate_id=data["aggregate_id"],
            version=data["version"]
        )
        event.id = data["id"]
        event.timestamp = data["timestamp"]
        return event

class EventStore:
    def __init__(self):
        self.events: Dict[str, List[Event]] = {}
        self.subscribers: Dict[str, List[Callable]] = {}

    def save_event(self, event: Event) -> None:
        """Save an event to the store and publish it to subscribers"""
        if event.aggregate_id not in self.events:
            self.events[event.aggregate_id] = []

        # Check for version conflicts
        events = self.events[event.aggregate_id]
        if events and events[-1].version >= event.version:
            raise ValueError(f"Version conflict for {event.aggregate_id}")

        # Store the event
        self.events[event.aggregate_id].append(event)

        # Publish the event
        self._publish(event)

    def get_events(self, aggregate_id: str, 
                  since_version: int = 0) -> List[Event]:
        """Get all events for an aggregate ID since a specific version"""
        events = self.events.get(aggregate_id, [])
        return [e for e in events if e.version > since_version]

    def subscribe(self, event_type: str, callback: Callable[[Event], None]) -> None:
        """Subscribe to events of a specific type"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(callback)

    def _publish(self, event: Event) -> None:
        """Publish event to subscribers"""
        subscribers = self.subscribers.get(event.event_type, [])
        for subscriber in subscribers:
            try:
                subscriber(event)
            except Exception as e:
                # In production, log this error but don't stop processing
                print(f"Error in subscriber: {e}")

class Aggregate:
    """Base class for aggregates that use event sourcing"""

    def __init__(self, id: str, event_store: EventStore):
        self.id = id
        self.version = 0
        self._event_store = event_store
        self._changes: List[Event] = []

    def load_from_history(self) -> None:
        """Load the aggregate state from its event history"""
        events = self._event_store.get_events(self.id)
        for event in events:
            self._apply_event(event)
            self.version = event.version

    def apply_event(self, event_type: str, data: Dict[str, Any]) -> None:
        """Apply a new event to the aggregate"""
        event = Event(
            event_type=event_type,
            data=data,
            aggregate_id=self.id,
            version=self.version + 1
        )

        # Apply event to update current state
        self._apply_event(event)

        # Track the change to be committed
        self._changes.append(event)
        self.version += 1

    def commit(self) -> None:
        """Commit all pending changes to the event store"""
        for event in self._changes:
            self._event_store.save_event(event)
        self._changes = []

    def _apply_event(self, event: Event) -> None:
        """Apply an event to the aggregate - override in subclasses"""
        method_name = f"apply_{event.event_type}"
        method = getattr(self, method_name, None)
        if method:
            method(event.data)
Enter fullscreen mode Exit fullscreen mode

Here's an order management example using event sourcing:

class Order(Aggregate):
    def __init__(self, id: str, event_store: EventStore):
        super().__init__(id, event_store)
        self.status = "new"
        self.items = []
        self.customer_id = None
        self.total = 0

    def create(self, customer_id: str) -> None:
        self.apply_event("order_created", {
            "customer_id": customer_id
        })

    def add_item(self, product_id: str, quantity: int, price: float) -> None:
        if self.status != "new":
            raise ValueError("Cannot add items to non-new order")

        self.apply_event("item_added", {
            "product_id": product_id,
            "quantity": quantity,
            "price": price
        })

    def submit(self) -> None:
        if not self.items:
            raise ValueError("Cannot submit empty order")

        self.apply_event("order_submitted", {})

    def apply_order_created(self, data: Dict[str, Any]) -> None:
        self.customer_id = data["customer_id"]
        self.status = "new"

    def apply_item_added(self, data: Dict[str, Any]) -> None:
        item = {
            "product_id": data["product_id"],
            "quantity": data["quantity"],
            "price": data["price"]
        }
        self.items.append(item)
        self.total += data["quantity"] * data["price"]

    def apply_order_submitted(self, data: Dict[str, Any]) -> None:
        self.status = "submitted"

# Using the event sourcing system
event_store = EventStore()

# Create a new order
order = Order("order-123", event_store)
order.create("customer-456")
order.add_item("product-789", 2, 10.99)
order.add_item("product-101", 1, 24.99)
order.submit()
order.commit()

# Later, reconstruct the order state
loaded_order = Order("order-123", event_store)
loaded_order.load_from_history()
print(f"Order total: ${loaded_order.total}")
print(f"Order status: {loaded_order.status}")
print(f"Items count: {len(loaded_order.items)}")
Enter fullscreen mode Exit fullscreen mode

Health Check Pattern

The Health Check pattern implements monitoring endpoints to report service status:

import time
import asyncio
import threading
from enum import Enum
from typing import Dict, List, Callable, Any, Optional
from dataclasses import dataclass, field

class HealthStatus(Enum):
    UP = "UP"
    DOWN = "DOWN"
    DEGRADED = "DEGRADED"
    UNKNOWN = "UNKNOWN"

@dataclass
class HealthCheck:
    name: str
    check_function: Callable[[], bool]
    timeout: float = 5.0
    importance: str = "critical"  # critical, high, medium, low

@dataclass
class DependencyHealth:
    name: str
    status: HealthStatus = HealthStatus.UNKNOWN
    last_checked: float = 0
    details: Dict[str, Any] = field(default_factory=dict)

class HealthMonitor:
    def __init__(self, service_name: str, version: str):
        self.service_name = service_name
        self.version = version
        self.checks: List[HealthCheck] = []
        self.dependencies: Dict[str, DependencyHealth] = {}
        self.startup_time = time.time()
        self._lock = threading.Lock()

    def register_check(self, name: str, check_function: Callable[[], bool], 
                      timeout: float = 5.0, importance: str = "critical") -> None:
        """Register a new health check"""
        check = HealthCheck(name, check_function, timeout, importance)
        self.checks.append(check)

    def register_dependency(self, name: str) -> None:
        """Register a dependency to monitor"""
        with self._lock:
            self.dependencies[name] = DependencyHealth(name)

    def update_dependency(self, name: str, status: HealthStatus, 
                         details: Optional[Dict[str, Any]] = None) -> None:
        """Update the health status of a dependency"""
        with self._lock:
            if name not in self.dependencies:
                self.register_dependency(name)

            dep = self.dependencies[name]
            dep.status = status
            dep.last_checked = time.time()
            if details:
                dep.details = details

    async def run_checks(self) -> Dict[str, Any]:
        """Run all registered health checks"""
        results = {}

        for check in self.checks:
            try:
                # Run check with timeout
                loop = asyncio.get_event_loop()
                result = await asyncio.wait_for(
                    loop.run_in_executor(None, check.check_function),
                    timeout=check.timeout
                )
                results[check.name] = {
                    "status": HealthStatus.UP.value if result else HealthStatus.DOWN.value,
                    "importance": check.importance
                }
            except asyncio.TimeoutError:
                results[check.name] = {
                    "status": HealthStatus.DOWN.value,
                    "importance": check.importance,
                    "error": "Timeout"
                }
            except Exception as e:
                results[check.name] = {
                    "status": HealthStatus.DOWN.value,
                    "importance": check.importance,
                    "error": str(e)
                }

        return results

    async def get_health(self) -> Dict[str, Any]:
        """Get complete health information"""
        check_results = await self.run_checks()

        # Determine overall status
        status = HealthStatus.UP

        # Check for critical failures
        for name, result in check_results.items():
            if (result["importance"] == "critical" and 
                result["status"] == HealthStatus.DOWN.value):
                status = HealthStatus.DOWN
                break

        # Check for high importance degradation
        if status != HealthStatus.DOWN:
            for name, result in check_results.items():
                if (result["importance"] in ["critical", "high"] and 
                    result["status"] != HealthStatus.UP.value):
                    status = HealthStatus.DEGRADED
                    break

        # Build health response
        with self._lock:
            deps = {name: {"status": dep.status.value, "details": dep.details} 
                   for name, dep in self.dependencies.items()}

        return {
            "status": status.value,
            "service": self.service_name,
            "version": self.version,
            "uptime": time.time() - self.startup_time,
            "checks": check_results,
            "dependencies": deps
        }
Enter fullscreen mode Exit fullscreen mode

To use this with FastAPI:

from fastapi import FastAPI, HTTPException

app = FastAPI()
health_monitor = HealthMonitor("order-service", "1.0.0")

# Register health checks
health_monitor.register_check(
    "database", 
    lambda: db_pool.is_connected(),
    importance="critical"
)

health_monitor.register_check(
    "disk_space",
    lambda: check_disk_space() > 100_000_000,  # 100MB free
    importance="high"
)

# Update dependency status
health_monitor.update_dependency(
    "payment-service", 
    HealthStatus.UP,
    {"endpoint": "https://payment.example.com/api"} 
)

@app.get("/health")
async def health():
    """Basic health endpoint for load balancers"""
    health_info = await health_monitor.get_health()
    if health_info["status"] == HealthStatus.DOWN.value:
        raise HTTPException(status_code=503, detail="Service Unavailable")
    return {"status": "UP"}

@app.get("/health/details")
async def health_details():
    """Detailed health information for monitoring systems"""
    return await health_monitor.get_health()
Enter fullscreen mode Exit fullscreen mode

Sidecar Pattern

The Sidecar pattern deploys helper services alongside main services to handle cross-cutting concerns:


python
import os
import time
import signal
import subprocess
import threading
from typing import List, Dict, Any, Optional
import logging
import json

class SidecarProcess:
    def __init__(self, name: str, command: List[str], env: Optional[Dict[str, str]] = None):
        self.name = name
        self.command = command
        self.env = env or {}
        self.process = None
        self.stopped = False
        self.logger = logging.getLogger(f"sidecar.{name}")

    def start(self) -> bool:
        """Start the sidecar process"""
        try:
            # Merge current environment with sidecar-specific env
            process_

---
## 101 Books

**101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone.

Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon. 

Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!

## Our Creations

Be sure to check out our creations:

**[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)**

---

### We are on Medium

**[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**

Enter fullscreen mode Exit fullscreen mode

Top comments (0)