Documentation Index
Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The Agent Orchestration Protocol (AOP) is a powerful framework for deploying and managing agents as distributed services. AOP enables agents to be discovered, managed, and executed through a standardized protocol built on the Model Context Protocol (MCP), making it perfect for building scalable, production-ready multi-agent systems.
Key Features
- Distributed Deployment: Deploy agents as independent services accessible via MCP
- Service Discovery: Automatic agent registration and discovery
- Queue-Based Execution: Built-in task queuing for reliability and performance
- Monitoring & Stats: Real-time queue statistics and performance metrics
- Persistence Mode: Automatic restart and failsafe protection
- Network Resilience: Connection monitoring and automatic retry capabilities
Architecture
AOP transforms agents into MCP tools that can be:
- Accessed remotely via HTTP or other transports
- Discovered through standardized protocols
- Managed centrally with queue-based task execution
- Monitored for performance and health
┌─────────────────────────────────────────────┐
│ AOP Server (MCP) │
├─────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Agent 1 │ │ Agent 2 │ │
│ │ (as Tool) │ │ (as Tool) │ ... │
│ └─────────────┘ └─────────────┘ │
│ ↓ ↓ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Task Queue │ │ Task Queue │ │
│ │ + Workers │ │ + Workers │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────┘
Basic Usage
Creating an AOP Server
from swarms import Agent
from swarms.structs.aop import AOP
# Create specialized agents
research_agent = Agent(
agent_name="Research-Agent",
agent_description="Expert in research and data collection",
model_name="claude-sonnet-4-6",
max_loops=1,
)
analysis_agent = Agent(
agent_name="Analysis-Agent",
agent_description="Expert in data analysis and insights",
model_name="claude-sonnet-4-6",
max_loops=1,
)
# Create AOP server with queue-based execution
deployer = AOP(
server_name="ResearchCluster",
description="A cluster of research and analysis agents",
port=8000,
verbose=True,
queue_enabled=True, # Enable task queuing
max_workers_per_agent=2, # Concurrent workers per agent
)
# Add agents to the server
deployer.add_agent(
agent=research_agent,
tool_name="research_tool",
tool_description="Research and data collection tool",
timeout=30,
max_retries=3
)
deployer.add_agent(
agent=analysis_agent,
tool_name="analysis_tool",
tool_description="Data analysis and insights tool",
timeout=30,
max_retries=3
)
# Start the server
deployer.run()
Adding Multiple Agents in Batch
from swarms import Agent
from swarms.structs.aop import AOP
# Create multiple agents
agents = [
Agent(
agent_name=f"Worker-{i}",
agent_description=f"Specialized worker agent {i}",
model_name="claude-sonnet-4-6",
max_loops=1,
)
for i in range(5)
]
# Create AOP server
deployer = AOP(
server_name="WorkerCluster",
agents=agents, # Pass agents at initialization
port=8000,
queue_enabled=True,
)
# Or add them after initialization
# deployer.add_agents_batch(
# agents=agents,
# tool_names=[f"worker_{i}" for i in range(5)],
# timeouts=[30] * 5,
# max_retries_list=[3] * 5,
# )
deployer.run()
Task Queue System
AOP includes a powerful task queue system for reliable, scalable task execution.
Queue Features
- Priority-based queuing: Tasks with higher priority execute first
- Automatic retries: Failed tasks retry automatically with backoff
- Worker threads: Multiple workers process tasks concurrently
- Task tracking: Monitor task status (pending, processing, completed, failed)
- Statistics: Real-time metrics on queue performance
Task Lifecycle
from swarms.structs.aop import TaskStatus
# Task flows through these states:
# PENDING -> PROCESSING -> COMPLETED
# -> FAILED (with retries)
# -> CANCELLED
Queue Configuration
deployer = AOP(
server_name="ProductionCluster",
queue_enabled=True,
max_workers_per_agent=5, # 5 concurrent workers
max_queue_size_per_agent=1000, # Queue capacity
processing_timeout=60, # Task timeout in seconds
retry_delay=1.0, # Delay between retries
)
Monitoring and Statistics
Getting Queue Statistics
# Get stats for a specific agent
stats = deployer.get_queue_stats(tool_name="research_tool")
print(stats)
# {
# "success": True,
# "agent_name": "research_tool",
# "stats": {
# "total_tasks": 150,
# "completed_tasks": 142,
# "failed_tasks": 3,
# "pending_tasks": 5,
# "processing_tasks": 2,
# "average_processing_time": 2.34,
# "queue_size": 5,
# "queue_status": "running"
# }
# }
# Get stats for all agents
all_stats = deployer.get_queue_stats()
Agent Discovery
# List all registered agents
agent_names = deployer.list_agents()
print(f"Registered agents: {agent_names}")
# Get detailed info about an agent
info = deployer.get_agent_info("research_tool")
print(info)
# {
# "tool_name": "research_tool",
# "agent_name": "Research-Agent",
# "agent_description": "Expert in research and data collection",
# "model_name": "claude-sonnet-4-6",
# "timeout": 30,
# "max_retries": 3,
# "verbose": True
# }
Production Features
Persistence Mode
Enable automatic restart and failsafe protection:
deployer = AOP(
server_name="ProductionCluster",
persistence=True, # Enable persistence mode
max_restart_attempts=10, # Max restart attempts
restart_delay=5.0, # Delay between restarts (seconds)
)
Network Monitoring
Automatic network connection monitoring and retry:
deployer = AOP(
server_name="ProductionCluster",
network_monitoring=True, # Enable network monitoring
max_network_retries=5, # Max reconnection attempts
network_retry_delay=10.0, # Delay between retries
network_timeout=30.0, # Connection timeout
)
Error Handling and Logging
deployer = AOP(
server_name="ProductionCluster",
verbose=True, # Enable verbose logging
traceback_enabled=True, # Enable detailed tracebacks
log_level="INFO", # Set log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
)
Transport Options
AOP supports multiple transport types for different deployment scenarios:
# HTTP transport (default)
deployer = AOP(
server_name="HTTPCluster",
transport="streamable-http",
host="localhost",
port=8000,
)
# Start the server
deployer.run()
Advanced Configuration
# Define custom input/output schemas
input_schema = {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"filters": {"type": "object", "description": "Search filters"},
},
"required": ["query"],
}
output_schema = {
"type": "object",
"properties": {
"results": {"type": "array", "description": "Search results"},
"total": {"type": "integer", "description": "Total results"},
},
}
deployer.add_agent(
agent=search_agent,
tool_name="search_tool",
input_schema=input_schema,
output_schema=output_schema,
)
Per-Agent Configuration
# Different settings for different agents
deployer.add_agent(
agent=fast_agent,
tool_name="fast_tool",
timeout=10, # Quick timeout
max_retries=1, # Minimal retries
verbose=False, # Quiet
)
deployer.add_agent(
agent=critical_agent,
tool_name="critical_tool",
timeout=300, # Long timeout
max_retries=10, # Many retries
verbose=True, # Detailed logging
)
Best Practices
1. Use Queue-Based Execution
Always enable queuing for production deployments:
deployer = AOP(
queue_enabled=True,
max_workers_per_agent=3, # Adjust based on load
)
2. Set Appropriate Timeouts
Configure timeouts based on agent complexity:
# Quick agents
deployer.add_agent(agent, timeout=15)
# Complex agents
deployer.add_agent(agent, timeout=120)
Regularly check queue statistics:
import time
while True:
stats = deployer.get_queue_stats()
print(f"Queue stats: {stats}")
time.sleep(60) # Check every minute
4. Handle Graceful Shutdown
import signal
def shutdown_handler(signum, frame):
print("Shutting down gracefully...")
# Stop all queue workers
for queue in deployer.task_queues.values():
queue.stop_workers()
exit(0)
signal.signal(signal.SIGINT, shutdown_handler)
deployer.run()
agent = Agent(
agent_name="Research-Agent",
tags=["research", "data-collection", "analysis"],
capabilities=["web-search", "data-gathering", "report-generation"],
role="researcher",
)