> ## Documentation Index
> Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
> Use this file to discover all available pages before exploring further.

# Scaling Swarms

> Scale your multi-agent systems horizontally with concurrent and parallel execution

## Overview

Swarms provides powerful utilities for scaling agent execution horizontally through concurrent and parallel processing. This guide covers the different scaling patterns, performance optimization techniques, and best practices for running agents at scale.

## Scaling Patterns

Swarms offers multiple execution patterns for different scaling scenarios:

| Pattern        | Use Case                         | Max Workers      | Execution            |
| -------------- | -------------------------------- | ---------------- | -------------------- |
| **Concurrent** | I/O-bound tasks, API calls       | 95% of CPU cores | Thread-based         |
| **Async**      | High-throughput async operations | Configurable     | Event loop           |
| **Batch**      | Large-scale processing           | Configurable     | Batched              |
| **Grid**       | Different tasks per agent        | 90% of CPU cores | Thread-based         |
| **UVLoop**     | Ultra-high performance async     | 95% of CPU cores | Optimized event loop |

## Concurrent Execution

### Basic Concurrent Execution

Run multiple agents on the same task concurrently using `ThreadPoolExecutor`:

```python theme={null}
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently

# Create multiple agents
agents = [
    Agent(
        agent_name=f"Worker-{i}",
        model_name="gpt-5.4",
        max_loops=1,
    )
    for i in range(5)
]

# Run all agents concurrently on the same task
results = run_agents_concurrently(
    agents=agents,
    task="Analyze the potential impact of AI on healthcare",
    max_workers=None,  # Uses 95% of CPU cores by default
)

# Results is a list of outputs in completion order
for i, result in enumerate(results):
    print(f"Agent {i+1} result: {result}")
```

### Concurrent with Dictionary Output

Get results as a dictionary mapping agent names to outputs:

```python theme={null}
results_dict = run_agents_concurrently(
    agents=agents,
    task="Generate a market analysis report",
    return_agent_output_dict=True,  # Return as dict
)

# Results preserve agent order
for agent_name, output in results_dict.items():
    print(f"Result from {agent_name}:")
    print(output)
    print("-" * 50)
```

### Concurrent with Images

```python theme={null}
from swarms.structs.multi_agent_exec import run_agents_concurrently

# Process image across multiple agents
results = run_agents_concurrently(
    agents=vision_agents,
    task="Analyze this medical scan",
    img="path/to/scan.jpg",
    max_workers=5,
)
```

## Asynchronous Execution

### Basic Async Execution

```python theme={null}
import asyncio
from swarms.structs.multi_agent_exec import (
    run_agent_async,
    run_agents_concurrently_async,
)

async def process_tasks():
    agents = [
        Agent(agent_name=f"Async-Agent-{i}", model_name="gpt-5.4")
        for i in range(10)
    ]
    
    # Run all agents asynchronously
    results = await run_agents_concurrently_async(
        agents=agents,
        task="Process this data"
    )
    
    return results

# Run the async function
results = asyncio.run(process_tasks())
```

### High-Performance with UVLoop

For maximum async performance, use uvloop (Linux/macOS) or winloop (Windows):

```python theme={null}
from swarms.structs.multi_agent_exec import run_agents_concurrently_uvloop

# Automatically uses uvloop on Unix, winloop on Windows
results = run_agents_concurrently_uvloop(
    agents=agents,
    task="High-throughput task processing",
    max_workers=None,  # Uses 95% of CPU cores
)

# Also supports different tasks per agent
results = run_agents_with_tasks_uvloop(
    agents=agents,
    tasks=[f"Task {i}" for i in range(len(agents))],
    max_workers=10,
)
```

## Batch Processing

### Batched Concurrent Execution

Process agents in batches to avoid resource exhaustion:

```python theme={null}
from swarms.structs.multi_agent_exec import (
    run_agents_concurrently_multiprocess
)
import os

# Process large number of agents in batches
agents = [Agent(agent_name=f"Agent-{i}") for i in range(100)]

results = run_agents_concurrently_multiprocess(
    agents=agents,
    task="Process this task",
    batch_size=os.cpu_count(),  # Process in CPU-sized batches
)
```

### Grid Execution (Different Tasks)

Run different tasks across different agents:

```python theme={null}
from swarms.structs.multi_agent_exec import batched_grid_agent_execution

# Create specialized agents
agents = [
    Agent(agent_name="Researcher", system_prompt="Research expert"),
    Agent(agent_name="Writer", system_prompt="Content writer"),
    Agent(agent_name="Analyst", system_prompt="Data analyst"),
]

# Different task for each agent
tasks = [
    "Research AI trends",
    "Write a blog post",
    "Analyze market data",
]

# Execute in parallel
results = batched_grid_agent_execution(
    agents=agents,
    tasks=tasks,
    max_workers=None,  # Uses 90% of CPU cores
)

# Results maintain order of input agents
for i, result in enumerate(results):
    print(f"{agents[i].agent_name} completed: {tasks[i]}")
    print(f"Result: {result}")
```

### Batch with Agent-Task Pairs

```python theme={null}
from swarms.structs.multi_agent_exec import run_agents_with_different_tasks

# Create agent-task pairs
agent_task_pairs = [
    (researcher, "Research quantum computing"),
    (writer, "Write about AI ethics"),
    (analyst, "Analyze stock trends"),
    (editor, "Edit research paper"),
    # ... hundreds more pairs
]

# Process in batches
results = run_agents_with_different_tasks(
    agent_task_pairs=agent_task_pairs,
    batch_size=10,  # Process 10 at a time
    max_workers=5,  # Use 5 workers per batch
)
```

## Queue-Based Scaling

For production deployments, use AOP's queue-based execution:

```python theme={null}
from swarms import Agent
from swarms.structs.aop import AOP

# Create agents
agents = [
    Agent(agent_name=f"Worker-{i}", model_name="claude-sonnet-4-6")
    for i in range(10)
]

# Deploy with queue-based execution
deployer = AOP(
    server_name="ScalableCluster",
    agents=agents,
    queue_enabled=True,
    max_workers_per_agent=5,  # 5 workers per agent
    max_queue_size_per_agent=1000,  # Large queue capacity
)

deployer.run()
```

### Queue Configuration for Scale

```python theme={null}
deployer = AOP(
    server_name="HighThroughputCluster",
    queue_enabled=True,
    
    # Worker configuration
    max_workers_per_agent=10,  # More workers = more throughput
    
    # Queue configuration
    max_queue_size_per_agent=10000,  # Large queue
    processing_timeout=60,  # Adjust based on task complexity
    retry_delay=0.5,  # Quick retries
    
    # Performance tuning
    verbose=False,  # Reduce logging overhead
)
```

## Performance Optimization

### Worker Configuration

```python theme={null}
import os

# Calculate optimal worker count
num_cores = os.cpu_count()

# For I/O-bound tasks (API calls, network)
io_bound_workers = int(num_cores * 2)  # 2x cores

# For CPU-bound tasks
cpu_bound_workers = num_cores  # 1x cores

# For mixed workloads
mixed_workers = int(num_cores * 1.5)  # 1.5x cores

results = run_agents_concurrently(
    agents=agents,
    task="Task",
    max_workers=io_bound_workers,
)
```

### Dynamic Context Window

Optimize token usage with dynamic context windows:

```python theme={null}
agent = Agent(
    agent_name="Optimized-Agent",
    model_name="claude-sonnet-4-6",
    dynamic_context_window=True,  # Auto-manage context
    context_length=8000,
)
```

### Memory Management

```python theme={null}
# Truncate history to prevent memory bloat
agent = Agent(
    agent_name="Memory-Efficient-Agent",
    model_name="claude-sonnet-4-6",
    context_length=4000,
)

# Manual truncation after processing
for task in large_task_list:
    result = agent.run(task)
    agent.truncate_history(max_messages=20)  # Keep only recent messages
```

### Batch Size Tuning

```python theme={null}
def find_optimal_batch_size(agents, test_task):
    """
    Find optimal batch size through testing.
    """
    import time
    
    batch_sizes = [5, 10, 20, 50, 100]
    results = {}
    
    for batch_size in batch_sizes:
        start = time.time()
        run_agents_concurrently_multiprocess(
            agents[:batch_size],
            test_task,
            batch_size=batch_size,
        )
        duration = time.time() - start
        results[batch_size] = duration
        print(f"Batch size {batch_size}: {duration:.2f}s")
    
    optimal = min(results.items(), key=lambda x: x[1])
    print(f"Optimal batch size: {optimal[0]}")
    return optimal[0]
```

## Load Balancing

### Round-Robin Distribution

```python theme={null}
class LoadBalancer:
    def __init__(self, agents):
        self.agents = agents
        self.current_index = 0
    
    def get_next_agent(self):
        agent = self.agents[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.agents)
        return agent
    
    def process_tasks(self, tasks):
        results = []
        for task in tasks:
            agent = self.get_next_agent()
            result = agent.run(task)
            results.append(result)
        return results

# Usage
balancer = LoadBalancer(agents)
results = balancer.process_tasks(large_task_list)
```

### Priority-Based Distribution

```python theme={null}
import heapq

class PriorityLoadBalancer:
    def __init__(self, agents):
        # Track agent load (priority queue)
        self.agent_load = [(0, agent) for agent in agents]
        heapq.heapify(self.agent_load)
    
    def assign_task(self, task, priority=0):
        # Get least loaded agent
        load, agent = heapq.heappop(self.agent_load)
        
        # Process task
        result = agent.run(task)
        
        # Update load and re-add to queue
        heapq.heappush(self.agent_load, (load + 1 - priority, agent))
        
        return result
```

## Monitoring at Scale

### Real-Time Metrics

```python theme={null}
import time
from collections import defaultdict

class ScalingMetrics:
    def __init__(self):
        self.agent_metrics = defaultdict(lambda: {
            "requests": 0,
            "successes": 0,
            "failures": 0,
            "total_duration": 0.0,
        })
    
    def record(self, agent_name, success, duration):
        metrics = self.agent_metrics[agent_name]
        metrics["requests"] += 1
        metrics["total_duration"] += duration
        if success:
            metrics["successes"] += 1
        else:
            metrics["failures"] += 1
    
    def get_summary(self):
        summary = {}
        for agent_name, metrics in self.agent_metrics.items():
            avg_duration = (
                metrics["total_duration"] / metrics["requests"]
                if metrics["requests"] > 0 else 0
            )
            success_rate = (
                metrics["successes"] / metrics["requests"]
                if metrics["requests"] > 0 else 0
            )
            summary[agent_name] = {
                "requests": metrics["requests"],
                "success_rate": f"{success_rate:.1%}",
                "avg_duration": f"{avg_duration:.2f}s",
            }
        return summary

metrics = ScalingMetrics()

def monitored_run(agent, task):
    start = time.time()
    success = False
    try:
        result = agent.run(task)
        success = True
        return result
    finally:
        duration = time.time() - start
        metrics.record(agent.agent_name, success, duration)
```

### Throughput Monitoring

```python theme={null}
import threading
import time

class ThroughputMonitor:
    def __init__(self, window_size=60):
        self.window_size = window_size
        self.requests = []
        self.lock = threading.Lock()
    
    def record_request(self):
        with self.lock:
            now = time.time()
            self.requests.append(now)
            # Remove old requests outside window
            cutoff = now - self.window_size
            self.requests = [t for t in self.requests if t > cutoff]
    
    def get_throughput(self):
        with self.lock:
            return len(self.requests) / self.window_size
    
    def start_reporting(self, interval=10):
        def report():
            while True:
                throughput = self.get_throughput()
                print(f"Current throughput: {throughput:.2f} req/s")
                time.sleep(interval)
        
        thread = threading.Thread(target=report, daemon=True)
        thread.start()

monitor = ThroughputMonitor()
monitor.start_reporting()

# Record each request
for task in tasks:
    result = agent.run(task)
    monitor.record_request()
```

## Best Practices

### 1. Choose the Right Pattern

* **I/O-bound tasks** (API calls): Use `run_agents_concurrently` with high worker count
* **CPU-bound tasks**: Use `run_agents_concurrently` with worker count = CPU cores
* **Async workloads**: Use `run_agents_concurrently_uvloop`
* **Large scale**: Use AOP with queue-based execution
* **Mixed tasks**: Use `batched_grid_agent_execution`

### 2. Configure Workers Appropriately

```python theme={null}
import os

# Get CPU count
num_cores = os.cpu_count()

# I/O-bound: 2x cores
max_workers = num_cores * 2

# CPU-bound: 1x cores  
max_workers = num_cores

# Conservative: 90-95% of cores
max_workers = int(num_cores * 0.9)
```

### 3. Implement Graceful Degradation

```python theme={null}
def resilient_concurrent_run(agents, task, max_workers=None):
    try:
        return run_agents_concurrently(
            agents=agents,
            task=task,
            max_workers=max_workers,
        )
    except Exception as e:
        logger.error(f"Concurrent execution failed: {e}")
        logger.info("Falling back to sequential execution")
        # Fallback to sequential
        return [agent.run(task) for agent in agents]
```

### 4. Monitor Resource Usage

```python theme={null}
import psutil

def check_resources():
    cpu_percent = psutil.cpu_percent(interval=1)
    memory_percent = psutil.virtual_memory().percent
    
    if cpu_percent > 90:
        logger.warning(f"High CPU usage: {cpu_percent}%")
    if memory_percent > 85:
        logger.warning(f"High memory usage: {memory_percent}%")
    
    return {
        "cpu_percent": cpu_percent,
        "memory_percent": memory_percent,
    }
```

### 5. Use Appropriate Timeouts

```python theme={null}
agent = Agent(
    agent_name="Timeout-Agent",
    model_name="claude-sonnet-4-6",
    timeout=30,  # Set based on expected task duration
)
```

## Related Resources

* [Agent Orchestration Protocol](/deployment/agent-orchestration-protocol)
* [Production Best Practices](/deployment/production-best-practices)
* [Monitoring Guide](/deployment/monitoring)
* [Multi-Agent Execution API](https://docs.swarms.world/en/latest/swarms/structs/multi_agent_exec/)
