Documentation Index
Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Swarms provides powerful utilities for scaling agent execution horizontally through concurrent and parallel processing. This guide covers the different scaling patterns, performance optimization techniques, and best practices for running agents at scale.
Scaling Patterns
Swarms offers multiple execution patterns for different scaling scenarios:
| Pattern | Use Case | Max Workers | Execution |
|---|
| Concurrent | I/O-bound tasks, API calls | 95% of CPU cores | Thread-based |
| Async | High-throughput async operations | Configurable | Event loop |
| Batch | Large-scale processing | Configurable | Batched |
| Grid | Different tasks per agent | 90% of CPU cores | Thread-based |
| UVLoop | Ultra-high performance async | 95% of CPU cores | Optimized event loop |
Concurrent Execution
Basic Concurrent Execution
Run multiple agents on the same task concurrently using ThreadPoolExecutor:
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently
# Create multiple agents
agents = [
Agent(
agent_name=f"Worker-{i}",
model_name="gpt-5.4",
max_loops=1,
)
for i in range(5)
]
# Run all agents concurrently on the same task
results = run_agents_concurrently(
agents=agents,
task="Analyze the potential impact of AI on healthcare",
max_workers=None, # Uses 95% of CPU cores by default
)
# Results is a list of outputs in completion order
for i, result in enumerate(results):
print(f"Agent {i+1} result: {result}")
Concurrent with Dictionary Output
Get results as a dictionary mapping agent names to outputs:
results_dict = run_agents_concurrently(
agents=agents,
task="Generate a market analysis report",
return_agent_output_dict=True, # Return as dict
)
# Results preserve agent order
for agent_name, output in results_dict.items():
print(f"Result from {agent_name}:")
print(output)
print("-" * 50)
Concurrent with Images
from swarms.structs.multi_agent_exec import run_agents_concurrently
# Process image across multiple agents
results = run_agents_concurrently(
agents=vision_agents,
task="Analyze this medical scan",
img="path/to/scan.jpg",
max_workers=5,
)
Asynchronous Execution
Basic Async Execution
import asyncio
from swarms.structs.multi_agent_exec import (
run_agent_async,
run_agents_concurrently_async,
)
async def process_tasks():
agents = [
Agent(agent_name=f"Async-Agent-{i}", model_name="gpt-5.4")
for i in range(10)
]
# Run all agents asynchronously
results = await run_agents_concurrently_async(
agents=agents,
task="Process this data"
)
return results
# Run the async function
results = asyncio.run(process_tasks())
For maximum async performance, use uvloop (Linux/macOS) or winloop (Windows):
from swarms.structs.multi_agent_exec import run_agents_concurrently_uvloop
# Automatically uses uvloop on Unix, winloop on Windows
results = run_agents_concurrently_uvloop(
agents=agents,
task="High-throughput task processing",
max_workers=None, # Uses 95% of CPU cores
)
# Also supports different tasks per agent
results = run_agents_with_tasks_uvloop(
agents=agents,
tasks=[f"Task {i}" for i in range(len(agents))],
max_workers=10,
)
Batch Processing
Batched Concurrent Execution
Process agents in batches to avoid resource exhaustion:
from swarms.structs.multi_agent_exec import (
run_agents_concurrently_multiprocess
)
import os
# Process large number of agents in batches
agents = [Agent(agent_name=f"Agent-{i}") for i in range(100)]
results = run_agents_concurrently_multiprocess(
agents=agents,
task="Process this task",
batch_size=os.cpu_count(), # Process in CPU-sized batches
)
Grid Execution (Different Tasks)
Run different tasks across different agents:
from swarms.structs.multi_agent_exec import batched_grid_agent_execution
# Create specialized agents
agents = [
Agent(agent_name="Researcher", system_prompt="Research expert"),
Agent(agent_name="Writer", system_prompt="Content writer"),
Agent(agent_name="Analyst", system_prompt="Data analyst"),
]
# Different task for each agent
tasks = [
"Research AI trends",
"Write a blog post",
"Analyze market data",
]
# Execute in parallel
results = batched_grid_agent_execution(
agents=agents,
tasks=tasks,
max_workers=None, # Uses 90% of CPU cores
)
# Results maintain order of input agents
for i, result in enumerate(results):
print(f"{agents[i].agent_name} completed: {tasks[i]}")
print(f"Result: {result}")
Batch with Agent-Task Pairs
from swarms.structs.multi_agent_exec import run_agents_with_different_tasks
# Create agent-task pairs
agent_task_pairs = [
(researcher, "Research quantum computing"),
(writer, "Write about AI ethics"),
(analyst, "Analyze stock trends"),
(editor, "Edit research paper"),
# ... hundreds more pairs
]
# Process in batches
results = run_agents_with_different_tasks(
agent_task_pairs=agent_task_pairs,
batch_size=10, # Process 10 at a time
max_workers=5, # Use 5 workers per batch
)
Queue-Based Scaling
For production deployments, use AOP’s queue-based execution:
from swarms import Agent
from swarms.structs.aop import AOP
# Create agents
agents = [
Agent(agent_name=f"Worker-{i}", model_name="claude-sonnet-4-6")
for i in range(10)
]
# Deploy with queue-based execution
deployer = AOP(
server_name="ScalableCluster",
agents=agents,
queue_enabled=True,
max_workers_per_agent=5, # 5 workers per agent
max_queue_size_per_agent=1000, # Large queue capacity
)
deployer.run()
Queue Configuration for Scale
deployer = AOP(
server_name="HighThroughputCluster",
queue_enabled=True,
# Worker configuration
max_workers_per_agent=10, # More workers = more throughput
# Queue configuration
max_queue_size_per_agent=10000, # Large queue
processing_timeout=60, # Adjust based on task complexity
retry_delay=0.5, # Quick retries
# Performance tuning
verbose=False, # Reduce logging overhead
)
Worker Configuration
import os
# Calculate optimal worker count
num_cores = os.cpu_count()
# For I/O-bound tasks (API calls, network)
io_bound_workers = int(num_cores * 2) # 2x cores
# For CPU-bound tasks
cpu_bound_workers = num_cores # 1x cores
# For mixed workloads
mixed_workers = int(num_cores * 1.5) # 1.5x cores
results = run_agents_concurrently(
agents=agents,
task="Task",
max_workers=io_bound_workers,
)
Dynamic Context Window
Optimize token usage with dynamic context windows:
agent = Agent(
agent_name="Optimized-Agent",
model_name="claude-sonnet-4-6",
dynamic_context_window=True, # Auto-manage context
context_length=8000,
)
Memory Management
# Truncate history to prevent memory bloat
agent = Agent(
agent_name="Memory-Efficient-Agent",
model_name="claude-sonnet-4-6",
context_length=4000,
)
# Manual truncation after processing
for task in large_task_list:
result = agent.run(task)
agent.truncate_history(max_messages=20) # Keep only recent messages
Batch Size Tuning
def find_optimal_batch_size(agents, test_task):
"""
Find optimal batch size through testing.
"""
import time
batch_sizes = [5, 10, 20, 50, 100]
results = {}
for batch_size in batch_sizes:
start = time.time()
run_agents_concurrently_multiprocess(
agents[:batch_size],
test_task,
batch_size=batch_size,
)
duration = time.time() - start
results[batch_size] = duration
print(f"Batch size {batch_size}: {duration:.2f}s")
optimal = min(results.items(), key=lambda x: x[1])
print(f"Optimal batch size: {optimal[0]}")
return optimal[0]
Load Balancing
Round-Robin Distribution
class LoadBalancer:
def __init__(self, agents):
self.agents = agents
self.current_index = 0
def get_next_agent(self):
agent = self.agents[self.current_index]
self.current_index = (self.current_index + 1) % len(self.agents)
return agent
def process_tasks(self, tasks):
results = []
for task in tasks:
agent = self.get_next_agent()
result = agent.run(task)
results.append(result)
return results
# Usage
balancer = LoadBalancer(agents)
results = balancer.process_tasks(large_task_list)
Priority-Based Distribution
import heapq
class PriorityLoadBalancer:
def __init__(self, agents):
# Track agent load (priority queue)
self.agent_load = [(0, agent) for agent in agents]
heapq.heapify(self.agent_load)
def assign_task(self, task, priority=0):
# Get least loaded agent
load, agent = heapq.heappop(self.agent_load)
# Process task
result = agent.run(task)
# Update load and re-add to queue
heapq.heappush(self.agent_load, (load + 1 - priority, agent))
return result
Monitoring at Scale
Real-Time Metrics
import time
from collections import defaultdict
class ScalingMetrics:
def __init__(self):
self.agent_metrics = defaultdict(lambda: {
"requests": 0,
"successes": 0,
"failures": 0,
"total_duration": 0.0,
})
def record(self, agent_name, success, duration):
metrics = self.agent_metrics[agent_name]
metrics["requests"] += 1
metrics["total_duration"] += duration
if success:
metrics["successes"] += 1
else:
metrics["failures"] += 1
def get_summary(self):
summary = {}
for agent_name, metrics in self.agent_metrics.items():
avg_duration = (
metrics["total_duration"] / metrics["requests"]
if metrics["requests"] > 0 else 0
)
success_rate = (
metrics["successes"] / metrics["requests"]
if metrics["requests"] > 0 else 0
)
summary[agent_name] = {
"requests": metrics["requests"],
"success_rate": f"{success_rate:.1%}",
"avg_duration": f"{avg_duration:.2f}s",
}
return summary
metrics = ScalingMetrics()
def monitored_run(agent, task):
start = time.time()
success = False
try:
result = agent.run(task)
success = True
return result
finally:
duration = time.time() - start
metrics.record(agent.agent_name, success, duration)
Throughput Monitoring
import threading
import time
class ThroughputMonitor:
def __init__(self, window_size=60):
self.window_size = window_size
self.requests = []
self.lock = threading.Lock()
def record_request(self):
with self.lock:
now = time.time()
self.requests.append(now)
# Remove old requests outside window
cutoff = now - self.window_size
self.requests = [t for t in self.requests if t > cutoff]
def get_throughput(self):
with self.lock:
return len(self.requests) / self.window_size
def start_reporting(self, interval=10):
def report():
while True:
throughput = self.get_throughput()
print(f"Current throughput: {throughput:.2f} req/s")
time.sleep(interval)
thread = threading.Thread(target=report, daemon=True)
thread.start()
monitor = ThroughputMonitor()
monitor.start_reporting()
# Record each request
for task in tasks:
result = agent.run(task)
monitor.record_request()
Best Practices
1. Choose the Right Pattern
- I/O-bound tasks (API calls): Use
run_agents_concurrently with high worker count
- CPU-bound tasks: Use
run_agents_concurrently with worker count = CPU cores
- Async workloads: Use
run_agents_concurrently_uvloop
- Large scale: Use AOP with queue-based execution
- Mixed tasks: Use
batched_grid_agent_execution
import os
# Get CPU count
num_cores = os.cpu_count()
# I/O-bound: 2x cores
max_workers = num_cores * 2
# CPU-bound: 1x cores
max_workers = num_cores
# Conservative: 90-95% of cores
max_workers = int(num_cores * 0.9)
3. Implement Graceful Degradation
def resilient_concurrent_run(agents, task, max_workers=None):
try:
return run_agents_concurrently(
agents=agents,
task=task,
max_workers=max_workers,
)
except Exception as e:
logger.error(f"Concurrent execution failed: {e}")
logger.info("Falling back to sequential execution")
# Fallback to sequential
return [agent.run(task) for agent in agents]
4. Monitor Resource Usage
import psutil
def check_resources():
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
if cpu_percent > 90:
logger.warning(f"High CPU usage: {cpu_percent}%")
if memory_percent > 85:
logger.warning(f"High memory usage: {memory_percent}%")
return {
"cpu_percent": cpu_percent,
"memory_percent": memory_percent,
}
5. Use Appropriate Timeouts
agent = Agent(
agent_name="Timeout-Agent",
model_name="claude-sonnet-4-6",
timeout=30, # Set based on expected task duration
)