> ## Documentation Index
> Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
> Use this file to discover all available pages before exploring further.

# Monitoring & Observability

> Monitor agent performance, track metrics, and gain observability into your Swarms deployment

## Overview

Monitoring is essential for running production multi-agent systems. This guide covers how to monitor agent performance, track metrics, implement logging, and gain full observability into your Swarms deployment.

## Logging Architecture

### Loguru Integration

Swarms uses [Loguru](https://github.com/Delgan/loguru) for powerful, structured logging:

```python theme={null}
from loguru import logger
import sys

# Basic configuration
logger.remove()  # Remove default handler
logger.add(
    sys.stderr,
    format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
    level="INFO",
    colorize=True,
)
```

### Multi-Level Logging

Configure different log levels for different outputs:

```python theme={null}
from loguru import logger
import sys

# Console: INFO and above
logger.add(
    sys.stderr,
    level="INFO",
    format="{time:HH:mm:ss} | {level} | {message}",
    colorize=True,
)

# File: All logs with rotation
logger.add(
    "logs/agent_{time}.log",
    rotation="500 MB",
    retention="10 days",
    compression="zip",
    level="DEBUG",
    format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
)

# Error file: Errors only
logger.add(
    "logs/errors_{time}.log",
    rotation="100 MB",
    retention="30 days",
    compression="zip",
    level="ERROR",
    filter=lambda record: record["level"].name == "ERROR",
)

# JSON format for structured logging
logger.add(
    "logs/structured_{time}.json",
    rotation="1 day",
    serialize=True,  # JSON format
    level="INFO",
)
```

### Contextual Logging

Add context to logs for better traceability:

```python theme={null}
from loguru import logger
import contextvars

# Create context variables
request_id_var = contextvars.ContextVar('request_id', default='unknown')
user_id_var = contextvars.ContextVar('user_id', default='anonymous')

# Configure logger to include context
logger.configure(
    patcher=lambda record: record.update(
        request_id=request_id_var.get(),
        user_id=user_id_var.get(),
    )
)

def process_request(request_id, user_id, task):
    # Set context
    request_id_var.set(request_id)
    user_id_var.set(user_id)
    
    logger.info(f"Processing task: {task}")
    try:
        result = agent.run(task)
        logger.info("Task completed successfully")
        return result
    except Exception as e:
        logger.error(f"Task failed: {e}")
        raise
```

## Agent-Level Monitoring

### Built-in Verbose Mode

```python theme={null}
from swarms import Agent

agent = Agent(
    agent_name="Monitored-Agent",
    model_name="claude-sonnet-4-6",
    verbose=True,  # Enable detailed logging
    print_on=True,  # Print to console
)

# Agent will log:
# - Task inputs
# - Tool executions  
# - LLM calls
# - Responses
# - Errors and retries
```

### Custom Monitoring Wrapper

```python theme={null}
import time
from loguru import logger
from typing import Any, Dict

class MonitoredAgent:
    def __init__(self, agent):
        self.agent = agent
        self.metrics = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "total_duration": 0.0,
            "errors": [],
        }
    
    def run(self, task: str, *args, **kwargs) -> Any:
        start_time = time.time()
        self.metrics["total_requests"] += 1
        
        logger.info(
            f"Starting task for {self.agent.agent_name}",
            extra={"task": task[:100]}
        )
        
        try:
            result = self.agent.run(task, *args, **kwargs)
            self.metrics["successful_requests"] += 1
            
            duration = time.time() - start_time
            self.metrics["total_duration"] += duration
            
            logger.info(
                f"Task completed successfully",
                extra={
                    "agent": self.agent.agent_name,
                    "duration": f"{duration:.2f}s",
                    "success": True,
                }
            )
            
            return result
            
        except Exception as e:
            self.metrics["failed_requests"] += 1
            self.metrics["errors"].append({
                "timestamp": time.time(),
                "error": str(e),
                "task": task[:100],
            })
            
            logger.error(
                f"Task failed",
                extra={
                    "agent": self.agent.agent_name,
                    "error": str(e),
                    "duration": f"{time.time() - start_time:.2f}s",
                }
            )
            raise
    
    def get_metrics(self) -> Dict:
        avg_duration = (
            self.metrics["total_duration"] / self.metrics["total_requests"]
            if self.metrics["total_requests"] > 0 else 0
        )
        
        success_rate = (
            self.metrics["successful_requests"] / self.metrics["total_requests"]
            if self.metrics["total_requests"] > 0 else 0
        )
        
        return {
            **self.metrics,
            "avg_duration": avg_duration,
            "success_rate": success_rate,
        }

# Usage
monitored_agent = MonitoredAgent(agent)
result = monitored_agent.run("Process this task")
metrics = monitored_agent.get_metrics()
```

## Queue Monitoring (AOP)

### Real-Time Queue Statistics

```python theme={null}
from swarms.structs.aop import AOP
import time
import threading

# Create AOP cluster
deployer = AOP(
    server_name="MonitoredCluster",
    agents=agents,
    queue_enabled=True,
    verbose=True,
)

# Monitor queue stats in background
def monitor_queues():
    while True:
        stats = deployer.get_queue_stats()
        
        if stats["success"]:
            for agent_name, agent_stats in stats["stats"].items():
                logger.info(
                    f"Queue stats for {agent_name}",
                    extra=agent_stats
                )
        
        time.sleep(60)  # Check every minute

monitor_thread = threading.Thread(target=monitor_queues, daemon=True)
monitor_thread.start()

deployer.run()
```

### Queue Metrics Dashboard

```python theme={null}
from rich.console import Console
from rich.table import Table
import time

console = Console()

def display_queue_dashboard(deployer):
    while True:
        # Clear console
        console.clear()
        
        # Create table
        table = Table(title="Agent Queue Statistics")
        table.add_column("Agent", style="cyan")
        table.add_column("Total", style="magenta")
        table.add_column("Completed", style="green")
        table.add_column("Failed", style="red")
        table.add_column("Pending", style="yellow")
        table.add_column("Processing", style="blue")
        table.add_column("Avg Time", style="white")
        table.add_column("Queue Size", style="white")
        
        # Get stats for all agents
        all_stats = deployer.get_queue_stats()
        
        if all_stats["success"]:
            for agent_name, stats in all_stats["stats"].items():
                table.add_row(
                    agent_name,
                    str(stats["total_tasks"]),
                    str(stats["completed_tasks"]),
                    str(stats["failed_tasks"]),
                    str(stats["pending_tasks"]),
                    str(stats["processing_tasks"]),
                    f"{stats['average_processing_time']:.2f}s",
                    str(stats["queue_size"]),
                )
        
        console.print(table)
        time.sleep(5)  # Update every 5 seconds

# Run dashboard in background
import threading
dashboard_thread = threading.Thread(
    target=display_queue_dashboard,
    args=(deployer,),
    daemon=True,
)
dashboard_thread.start()
```

## Performance Metrics

### Response Time Tracking

```python theme={null}
import time
from collections import deque
import statistics

class PerformanceTracker:
    def __init__(self, window_size=100):
        self.response_times = deque(maxlen=window_size)
        self.request_count = 0
    
    def record_request(self, duration: float):
        self.response_times.append(duration)
        self.request_count += 1
    
    def get_stats(self):
        if not self.response_times:
            return None
        
        return {
            "count": len(self.response_times),
            "total_requests": self.request_count,
            "mean": statistics.mean(self.response_times),
            "median": statistics.median(self.response_times),
            "min": min(self.response_times),
            "max": max(self.response_times),
            "stdev": statistics.stdev(self.response_times) if len(self.response_times) > 1 else 0,
        }

tracker = PerformanceTracker()

def timed_agent_run(agent, task):
    start = time.time()
    try:
        result = agent.run(task)
        return result
    finally:
        duration = time.time() - start
        tracker.record_request(duration)

# Get stats
stats = tracker.get_stats()
logger.info(f"Performance stats: {stats}")
```

### Throughput Monitoring

```python theme={null}
import time
from collections import deque

class ThroughputMonitor:
    def __init__(self, window_seconds=60):
        self.window_seconds = window_seconds
        self.requests = deque()
    
    def record_request(self):
        now = time.time()
        self.requests.append(now)
        
        # Remove old requests
        cutoff = now - self.window_seconds
        while self.requests and self.requests[0] < cutoff:
            self.requests.popleft()
    
    def get_throughput(self):
        return len(self.requests) / self.window_seconds
    
    def get_stats(self):
        throughput = self.get_throughput()
        return {
            "throughput_per_second": throughput,
            "throughput_per_minute": throughput * 60,
            "requests_in_window": len(self.requests),
            "window_seconds": self.window_seconds,
        }

monitor = ThroughputMonitor(window_seconds=60)

# Record each request
for task in tasks:
    result = agent.run(task)
    monitor.record_request()

# Get current throughput
stats = monitor.get_stats()
logger.info(f"Current throughput: {stats['throughput_per_minute']:.2f} req/min")
```

## Error Tracking

### Error Rate Monitoring

```python theme={null}
from collections import Counter
import time

class ErrorTracker:
    def __init__(self):
        self.total_requests = 0
        self.errors = []
        self.error_types = Counter()
    
    def record_error(self, error: Exception, context: dict = None):
        error_record = {
            "timestamp": time.time(),
            "error_type": type(error).__name__,
            "message": str(error),
            "context": context or {},
        }
        self.errors.append(error_record)
        self.error_types[type(error).__name__] += 1
    
    def record_success(self):
        self.total_requests += 1
    
    def get_error_rate(self):
        if self.total_requests == 0:
            return 0.0
        return len(self.errors) / self.total_requests
    
    def get_stats(self):
        return {
            "total_requests": self.total_requests,
            "total_errors": len(self.errors),
            "error_rate": self.get_error_rate(),
            "error_types": dict(self.error_types),
            "recent_errors": self.errors[-10:],  # Last 10 errors
        }

error_tracker = ErrorTracker()

def tracked_run(agent, task):
    try:
        result = agent.run(task)
        error_tracker.record_success()
        return result
    except Exception as e:
        error_tracker.record_error(e, {"task": task[:100], "agent": agent.agent_name})
        raise
```

## Health Checks

### Agent Health Check

```python theme={null}
import time
from typing import Dict, Any

def health_check(agent) -> Dict[str, Any]:
    """
    Perform comprehensive health check on agent.
    """
    try:
        # Quick test run
        start = time.time()
        result = agent.run("Hello", max_loops=1)
        duration = time.time() - start
        
        return {
            "status": "healthy",
            "agent_name": agent.agent_name,
            "model": agent.model_name,
            "response_time": duration,
            "timestamp": time.time(),
            "checks": {
                "llm_responsive": True,
                "response_valid": result is not None,
            }
        }
    except Exception as e:
        return {
            "status": "unhealthy",
            "agent_name": agent.agent_name,
            "error": str(e),
            "error_type": type(e).__name__,
            "timestamp": time.time(),
        }

# Periodic health checks
import threading

def periodic_health_check(agents, interval=60):
    def check():
        while True:
            for agent in agents:
                status = health_check(agent)
                if status["status"] == "unhealthy":
                    logger.error(f"Agent {agent.agent_name} is unhealthy: {status}")
                else:
                    logger.info(f"Agent {agent.agent_name} is healthy")
            time.sleep(interval)
    
    thread = threading.Thread(target=check, daemon=True)
    thread.start()
```

### System Health Check

```python theme={null}
import psutil

def system_health_check() -> Dict[str, Any]:
    """
    Check system resource health.
    """
    cpu_percent = psutil.cpu_percent(interval=1)
    memory = psutil.virtual_memory()
    disk = psutil.disk_usage('/')
    
    return {
        "status": "healthy" if cpu_percent < 90 and memory.percent < 85 else "warning",
        "cpu_percent": cpu_percent,
        "memory_percent": memory.percent,
        "disk_percent": disk.percent,
        "timestamp": time.time(),
        "warnings": [
            f"High CPU usage: {cpu_percent}%" if cpu_percent > 90 else None,
            f"High memory usage: {memory.percent}%" if memory.percent > 85 else None,
            f"Low disk space: {disk.percent}%" if disk.percent > 90 else None,
        ],
    }
```

## Alerting

### Simple Alert System

```python theme={null}
from enum import Enum
from typing import Callable, List

class AlertLevel(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class AlertManager:
    def __init__(self):
        self.handlers: List[Callable] = []
    
    def add_handler(self, handler: Callable):
        self.handlers.append(handler)
    
    def alert(self, level: AlertLevel, message: str, context: dict = None):
        alert_data = {
            "level": level.value,
            "message": message,
            "context": context or {},
            "timestamp": time.time(),
        }
        
        for handler in self.handlers:
            try:
                handler(alert_data)
            except Exception as e:
                logger.error(f"Alert handler failed: {e}")

# Alert handlers
def log_alert(alert_data):
    level = alert_data["level"]
    message = alert_data["message"]
    
    if level == "critical":
        logger.critical(message)
    elif level == "error":
        logger.error(message)
    elif level == "warning":
        logger.warning(message)
    else:
        logger.info(message)

def email_alert(alert_data):
    # Implement email notification
    pass

# Setup alerts
alert_manager = AlertManager()
alert_manager.add_handler(log_alert)
# alert_manager.add_handler(email_alert)

# Use alerts
if error_rate > 0.1:
    alert_manager.alert(
        AlertLevel.ERROR,
        "High error rate detected",
        {"error_rate": error_rate}
    )
```

## Best Practices

### 1. Use Structured Logging

```python theme={null}
# Good: Structured
logger.info(
    "Task completed",
    extra={
        "agent": agent.agent_name,
        "duration": duration,
        "success": True,
    }
)

# Avoid: Unstructured
logger.info(f"Task completed by {agent.agent_name} in {duration}s")
```

### 2. Log at Appropriate Levels

* **DEBUG**: Detailed diagnostic information
* **INFO**: General informational messages
* **WARNING**: Warning messages for recoverable issues
* **ERROR**: Error messages for failures
* **CRITICAL**: Critical errors requiring immediate attention

### 3. Include Context

```python theme={null}
logger.info(
    "Processing request",
    extra={
        "request_id": request_id,
        "user_id": user_id,
        "agent": agent.agent_name,
        "task_length": len(task),
    }
)
```

### 4. Monitor Key Metrics

* Response time (mean, median, p95, p99)
* Throughput (requests per second/minute)
* Error rate
* Queue sizes (if using AOP)
* Resource usage (CPU, memory)

### 5. Set Up Alerts

* High error rates
* Slow response times
* Resource exhaustion
* Queue backlogs

## Related Resources

* [Agent Orchestration Protocol](/deployment/agent-orchestration-protocol)
* [Production Best Practices](/deployment/production-best-practices)
* [Scaling Guide](/deployment/scaling)
* [Loguru Documentation](https://loguru.readthedocs.io/)
