> ## Documentation Index
> Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
> Use this file to discover all available pages before exploring further.

# Production Best Practices

> Essential practices for deploying Swarms agents in production environments

## Overview

Deploying multi-agent systems in production requires careful attention to error handling, retries, logging, monitoring, and system design. This guide covers the essential best practices for running Swarms agents reliably at scale.

## Error Handling

### Agent Error Hierarchy

Swarms provides a comprehensive exception hierarchy for different failure modes:

```python theme={null}
from swarms.structs.agent import (
    AgentError,  # Base exception
    AgentInitializationError,  # Initialization failures
    AgentRunError,  # Runtime failures
    AgentLLMError,  # LLM-related errors
    AgentToolError,  # Tool execution errors
    AgentMemoryError,  # Memory errors
    AgentToolExecutionError,  # Tool execution failures
)

try:
    agent = Agent(
        agent_name="Production-Agent",
        model_name="claude-sonnet-4-6",
        max_loops=3,
    )
    result = agent.run("Process this task")
except AgentInitializationError as e:
    logger.error(f"Failed to initialize agent: {e}")
    # Handle initialization failure (e.g., retry with different config)
except AgentLLMError as e:
    logger.error(f"LLM error: {e}")
    # Handle LLM failures (e.g., switch to fallback model)
except AgentToolError as e:
    logger.error(f"Tool error: {e}")
    # Handle tool failures (e.g., disable problematic tool)
except AgentRunError as e:
    logger.error(f"Runtime error: {e}")
    # Handle general runtime errors
except Exception as e:
    logger.error(f"Unexpected error: {e}")
    # Catch-all for unexpected errors
```

### Graceful Error Recovery

```python theme={null}
import traceback
from loguru import logger

def run_agent_with_recovery(agent, task, max_retries=3):
    """
    Run agent with automatic recovery from transient errors.
    """
    for attempt in range(max_retries):
        try:
            result = agent.run(task)
            return result
        except (AgentLLMError, AgentRunError) as e:
            logger.warning(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                logger.info(f"Retrying in {2 ** attempt} seconds...")
                time.sleep(2 ** attempt)  # Exponential backoff
            else:
                logger.error(f"All {max_retries} attempts failed")
                raise
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            logger.error(traceback.format_exc())
            raise
```

## Retry Strategies

### Built-in Retry Configuration

```python theme={null}
from swarms import Agent

agent = Agent(
    agent_name="Resilient-Agent",
    model_name="claude-sonnet-4-6",
    retry_attempts=5,  # Number of retry attempts
    retry_interval=2,  # Initial retry interval (seconds)
    max_loops=3,
)
```

### Exponential Backoff

```python theme={null}
import time
import random

def exponential_backoff_retry(
    func, 
    max_retries=5, 
    base_delay=1, 
    max_delay=60,
    jitter=True
):
    """
    Execute function with exponential backoff retry.
    """
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            delay = min(base_delay * (2 ** attempt), max_delay)
            if jitter:
                delay *= (0.5 + random.random())  # Add jitter
            
            logger.warning(
                f"Attempt {attempt + 1} failed: {e}. "
                f"Retrying in {delay:.2f}s..."
            )
            time.sleep(delay)

# Usage
result = exponential_backoff_retry(
    lambda: agent.run("Complex task"),
    max_retries=5,
    base_delay=1,
    max_delay=60,
)
```

### Fallback Models

Use fallback models for resilience:

```python theme={null}
agent = Agent(
    agent_name="Resilient-Agent",
    fallback_models=[
        "claude-sonnet-4-6",           # Primary model
        "gpt-5.4",      # First fallback
        "gpt-3.5-turbo",    # Second fallback
    ],
    max_loops=3,
)

# Agent automatically tries fallback models if primary fails
result = agent.run("Generate a report")
```

## Logging

### Structured Logging with Loguru

Swarms uses Loguru for powerful, structured logging:

```python theme={null}
from loguru import logger
import sys

# Configure logging for production
logger.remove()  # Remove default handler
logger.add(
    sys.stderr,
    format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
    level="INFO",
    colorize=True,
)

# Add file logging with rotation
logger.add(
    "logs/agent_{time}.log",
    rotation="500 MB",  # Rotate when file reaches 500MB
    retention="10 days",  # Keep logs for 10 days
    compression="zip",  # Compress rotated logs
    level="DEBUG",
    format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
)

# Add error-only log file
logger.add(
    "logs/errors_{time}.log",
    rotation="100 MB",
    retention="30 days",
    compression="zip",
    level="ERROR",
    filter=lambda record: record["level"].name == "ERROR",
)
```

### Agent-Specific Logging

```python theme={null}
# Enable verbose logging for specific agents
agent = Agent(
    agent_name="Debug-Agent",
    model_name="claude-sonnet-4-6",
    verbose=True,  # Enable verbose logging
    print_on=True,  # Print outputs to console
)

# Control logging in AOP
from swarms.structs.aop import AOP

deployer = AOP(
    server_name="ProductionCluster",
    verbose=True,
    traceback_enabled=True,  # Enable detailed tracebacks
    log_level="INFO",  # Set log level
)
```

### Contextual Logging

```python theme={null}
from loguru import logger
import contextvars

# Create context var for request ID
request_id_var = contextvars.ContextVar('request_id', default='unknown')

# Add request ID to all logs
logger.configure(
    patcher=lambda record: record.update(
        request_id=request_id_var.get()
    )
)

def process_request(request_id, task):
    request_id_var.set(request_id)
    logger.info(f"Processing task: {task}")
    
    try:
        result = agent.run(task)
        logger.info(f"Task completed successfully")
        return result
    except Exception as e:
        logger.error(f"Task failed: {e}")
        raise
```

## Monitoring

### Performance Metrics

```python theme={null}
import time
from loguru import logger

class AgentMetrics:
    def __init__(self):
        self.total_requests = 0
        self.successful_requests = 0
        self.failed_requests = 0
        self.total_duration = 0.0
    
    def record_request(self, success: bool, duration: float):
        self.total_requests += 1
        if success:
            self.successful_requests += 1
        else:
            self.failed_requests += 1
        self.total_duration += duration
    
    def get_metrics(self):
        avg_duration = (
            self.total_duration / self.total_requests 
            if self.total_requests > 0 else 0
        )
        success_rate = (
            self.successful_requests / self.total_requests 
            if self.total_requests > 0 else 0
        )
        
        return {
            "total_requests": self.total_requests,
            "successful_requests": self.successful_requests,
            "failed_requests": self.failed_requests,
            "success_rate": success_rate,
            "avg_duration": avg_duration,
        }

metrics = AgentMetrics()

def monitored_agent_run(agent, task):
    start_time = time.time()
    success = False
    
    try:
        result = agent.run(task)
        success = True
        return result
    except Exception as e:
        logger.error(f"Agent run failed: {e}")
        raise
    finally:
        duration = time.time() - start_time
        metrics.record_request(success, duration)
        logger.info(f"Request completed in {duration:.2f}s")
```

### Health Checks

```python theme={null}
def health_check(agent):
    """
    Perform health check on agent.
    """
    try:
        # Quick test run
        result = agent.run("Hello", max_loops=1)
        return {
            "status": "healthy",
            "agent_name": agent.agent_name,
            "model": agent.model_name,
            "timestamp": time.time(),
        }
    except Exception as e:
        return {
            "status": "unhealthy",
            "agent_name": agent.agent_name,
            "error": str(e),
            "timestamp": time.time(),
        }

# Periodic health checks
import threading

def periodic_health_check(agent, interval=60):
    def check():
        while True:
            status = health_check(agent)
            logger.info(f"Health check: {status}")
            time.sleep(interval)
    
    thread = threading.Thread(target=check, daemon=True)
    thread.start()
```

## Configuration Management

### Environment-Based Configuration

```python theme={null}
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Production configuration
PROD_CONFIG = {
    "model_name": os.getenv("MODEL_NAME", "claude-sonnet-4-6"),
    "max_loops": int(os.getenv("MAX_LOOPS", "3")),
    "retry_attempts": int(os.getenv("RETRY_ATTEMPTS", "5")),
    "timeout": int(os.getenv("TIMEOUT", "120")),
    "verbose": os.getenv("VERBOSE", "false").lower() == "true",
}

agent = Agent(
    agent_name="Production-Agent",
    **PROD_CONFIG,
)
```

### Configuration Validation

```python theme={null}
from pydantic import BaseModel, Field, validator

class AgentConfig(BaseModel):
    agent_name: str = Field(..., min_length=1)
    model_name: str
    max_loops: int = Field(default=3, ge=1, le=100)
    retry_attempts: int = Field(default=3, ge=0, le=10)
    timeout: int = Field(default=120, ge=1)
    
    @validator('model_name')
    def validate_model(cls, v):
        allowed_models = ["claude-sonnet-4-6", "gpt-5.4", "claude-sonnet-3.5"]
        if v not in allowed_models:
            raise ValueError(f"Model must be one of {allowed_models}")
        return v

# Load and validate config
config = AgentConfig(
    agent_name="Production-Agent",
    model_name="claude-sonnet-4-6",
    max_loops=5,
)

agent = Agent(**config.dict())
```

## Security Best Practices

### API Key Management

```python theme={null}
import os
from cryptography.fernet import Fernet

# Never hardcode API keys
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")

# Use environment variables or secure vaults
agent = Agent(
    model_name="claude-sonnet-4-6",
    llm_api_key=OPENAI_API_KEY,
)
```

### Input Validation

```python theme={null}
import re

def sanitize_input(text: str) -> str:
    """
    Sanitize user input to prevent injection attacks.
    """
    # Remove control characters
    text = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', text)
    
    # Limit length
    max_length = 10000
    if len(text) > max_length:
        text = text[:max_length]
    
    return text.strip()

# Use sanitized input
user_task = sanitize_input(request.get("task"))
result = agent.run(user_task)
```

### Safety Prompts

```python theme={null}
agent = Agent(
    agent_name="Safe-Agent",
    model_name="claude-sonnet-4-6",
    safety_prompt_on=True,  # Enable safety guardrails
)
```

## Resource Management

### Memory Management

```python theme={null}
# Truncate conversation history to manage memory
agent = Agent(
    agent_name="Memory-Managed-Agent",
    model_name="claude-sonnet-4-6",
    context_length=8000,  # Context window size
)

# Manual truncation
agent.truncate_history(max_messages=50)
```

### Connection Pooling

```python theme={null}
from concurrent.futures import ThreadPoolExecutor

# Use thread pool for concurrent requests
executor = ThreadPoolExecutor(max_workers=10)

def process_tasks(tasks):
    futures = []
    for task in tasks:
        future = executor.submit(agent.run, task)
        futures.append(future)
    
    return [f.result() for f in futures]
```

## State Management

### Autosave

```python theme={null}
agent = Agent(
    agent_name="Stateful-Agent",
    model_name="claude-sonnet-4-6",
    autosave=True,  # Auto-save state after each run
    saved_state_path="./agent_states/",
)
```

### Manual State Management

```python theme={null}
# Save state
agent.save_state()

# Load state
from swarms import Agent

agent = Agent(
    agent_name="Restored-Agent",
    model_name="claude-sonnet-4-6",
    load_state_path="./agent_states/agent_state.json",
)
```

## Testing

### Unit Testing

```python theme={null}
import unittest
from swarms import Agent

class TestAgent(unittest.TestCase):
    def setUp(self):
        self.agent = Agent(
            agent_name="Test-Agent",
            model_name="gpt-5.4",
            max_loops=1,
        )
    
    def test_basic_run(self):
        result = self.agent.run("Say hello")
        self.assertIsNotNone(result)
        self.assertIn("hello", result.lower())
    
    def test_error_handling(self):
        with self.assertRaises(AgentRunError):
            self.agent.run("")  # Empty task should fail
```

### Integration Testing

```python theme={null}
def test_agent_integration():
    """Test agent with real LLM."""
    agent = Agent(
        agent_name="Integration-Test-Agent",
        model_name="gpt-5.4",
        max_loops=2,
    )
    
    # Test with various inputs
    test_cases = [
        "Simple task",
        "Multi-step reasoning task",
        "Task requiring tool use",
    ]
    
    for task in test_cases:
        result = agent.run(task)
        assert result is not None
        assert len(result) > 0
```

## Related Resources

* [Agent Orchestration Protocol](/deployment/agent-orchestration-protocol)
* [Scaling Guide](/deployment/scaling)
* [Monitoring Guide](/deployment/monitoring)
* [Agent API Reference](https://docs.swarms.world/en/latest/swarms/structs/agent/)
