Skip to main content
The SwarmRouter is the highest-level multi-agent abstraction in the framework. Pass it a list of agents and a swarm_type — it builds the matching orchestrator (SequentialWorkflow, ConcurrentWorkflow, HierarchicalSwarm, MixtureOfAgents, HeavySwarm, etc.) and forwards run() calls to it. To switch architectures, change one string. The underlying swarm is built lazily on the first run() call and cached on the instance — repeated calls reuse it (keyed by swarm_type, agent identities, and construction-time config).

When to Use

  • Flexible orchestration — switch between swarm types without rewriting code
  • Strategy comparison — A/B different architectures on the same task
  • Unified interface — one API for every supported swarm
  • Dynamic selection — choose swarm_type at runtime
  • Production deployments — standardized swarm management with optional autosave

Key Features

  • 17 swarm types supported (plus "auto" for automatic routing)
  • Factory pattern with O(1) lookup and per-instance swarm cache
  • Pre-flight reliability checks
  • Optional autosave of config.json / state.json / metadata.json
  • Shared memory injection across agents
  • Multi-agent collaboration prompt injection
  • Per-swarm-type specialized parameters (HeavySwarm, AgentRearrange, GroupChat, etc.)
  • Inherits SerializableMixinto_dict() is available for telemetry / persistence

Supported Swarm Types

from swarms.structs.swarm_router import SwarmType

# SwarmType is a Literal — pass the matching string:
"SequentialWorkflow"
"ConcurrentWorkflow"
"AgentRearrange"
"MixtureOfAgents"
"GroupChat"
"MultiAgentRouter"
"AutoSwarmBuilder"
"HierarchicalSwarm"
"MajorityVoting"
"CouncilAsAJudge"
"HeavySwarm"
"BatchedGridWorkflow"
"LLMCouncil"
"DebateWithJudge"
"RoundRobin"
"PlannerWorkerSwarm"
"auto"             # let the router decide

Basic Example

from swarms import Agent, SwarmRouter

writer = Agent(
    agent_name="Writer",
    system_prompt="You are a creative writer.",
    model_name="gpt-5.4",
)

editor = Agent(
    agent_name="Editor",
    system_prompt="You are an expert editor.",
    model_name="gpt-5.4",
)

reviewer = Agent(
    agent_name="Reviewer",
    system_prompt="You are a quality reviewer.",
    model_name="gpt-5.4",
)

agents = [writer, editor, reviewer]
task = "Write a short story about AI"

# Sequential
seq = SwarmRouter(swarm_type="SequentialWorkflow", agents=agents)
seq_result = seq.run(task)

# Concurrent
conc = SwarmRouter(swarm_type="ConcurrentWorkflow", agents=agents)
conc_result = conc.run(task)

# Mixture of Agents (last agent acts as aggregator)
moa = SwarmRouter(swarm_type="MixtureOfAgents", agents=agents)
moa_result = moa.run(task)

Key Parameters

swarm_type
SwarmType
default:"\"SequentialWorkflow\""
Which orchestrator to instantiate. See the list above.
agents
List[Union[Agent, Callable]]
required
The agent roster the swarm will use. The role of each agent depends on swarm_type (e.g. for DebateWithJudge the first two are debaters and the third is the judge; for MixtureOfAgents the last agent is the aggregator).
id
str
Stable identifier for this router instance. Auto-generated if omitted.
name
str
default:"\"swarm-router\""
Human-readable name. Used for log lines and autosave directory naming.
description
str
default:"\"Routes your task to the desired swarm\""
Free-text description of what this router is for.
max_loops
int
default:"1"
Iteration count for the underlying swarm. Semantics depend on swarm_type (e.g. for MixtureOfAgents this is the number of layers).
output_type
OutputType
default:"\"dict-all-except-first\""
How the final swarm output is formatted.
autosave
bool
default:"False"
When True, save config.json at init and state.json + metadata.json after each run.
autosave_use_timestamp
bool
default:"True"
If True, use a timestamp in the autosave directory name; otherwise use a UUID.
rearrange_flow
str
Required when swarm_type="AgentRearrange". Flow DSL like "A -> B, C -> D".
shared_memory_system
Any
Memory backend injected into every agent’s long_term_memory.
multi_agent_collab_prompt
bool
default:"True"
Append the multi-agent collaboration preamble to every agent’s system prompt.
list_all_agents
bool
default:"False"
When True, every agent is told about every other agent at the start of a run.
conversation
Any
Pre-existing conversation object to seed the swarm with.
agents_config
Dict[Any, Any]
Optional per-agent config overrides.
speaker_fn
Callable
Speaker-selection function for GroupChat-style swarms.
speaker_function
str
Name-based speaker function selector (alternative to passing speaker_fn).
telemetry_enabled
bool
default:"False"
When True, snapshot each agent’s config into self.agent_config.
verbose
bool
default:"False"
Emit info / debug logs (reliability check, cache hits, swarm creation).

Swarm-Specific Parameters

AgentRearrange

rearrange_flow
str
required
Flow DSL (e.g. "researcher -> writer, editor").
router = SwarmRouter(
    swarm_type="AgentRearrange",
    agents=agents,
    rearrange_flow="researcher -> writer, editor",
)

HeavySwarm

heavy_swarm_question_agent_model_name
str
default:"\"gpt-4.1\""
Model for the HeavySwarm question agent.
heavy_swarm_worker_model_name
str
default:"\"gpt-4.1\""
Model for HeavySwarm workers.
heavy_swarm_swarm_show_output
bool
default:"True"
Print per-agent output for HeavySwarm.
heavy_swarm_variant
Literal["default", "medium", "heavy"]
default:"\"default\""
HeavySwarm architecture variant. See the Heavy Swarm docs.
heavy_swarm_max_loops
int
default:"1"
Iteration count for HeavySwarm multi-loop refinement.
heavy_swarm_timeout
int
default:"900"
Per-worker wall-clock cap (seconds) for HeavySwarm.
worker_tools
List[Callable]
Tools passed to HeavySwarm workers.
router = SwarmRouter(
    swarm_type="HeavySwarm",
    agents=agents,
    heavy_swarm_worker_model_name="claude-sonnet-4-20250514",
    heavy_swarm_question_agent_model_name="gpt-4.1",
    heavy_swarm_variant="medium",
    heavy_swarm_max_loops=2,
)

HierarchicalSwarm

router = SwarmRouter(
    swarm_type="HierarchicalSwarm",
    agents=worker_agents,
    max_loops=2,    # feedback loops
)

CouncilAsAJudge

council_judge_model_name
str
default:"\"gpt-5.4\""
Model used as the council judge.

LLMCouncil

chairman_model
str
default:"\"gpt-5.1\""
Chairman model for LLMCouncil.

Advanced Features

Shared Memory

from swarms.memory import ChromaDB

memory = ChromaDB()

router = SwarmRouter(
    swarm_type="SequentialWorkflow",
    agents=agents,
    shared_memory_system=memory,
)

Autosave

router = SwarmRouter(
    swarm_type="HierarchicalSwarm",
    agents=agents,
    autosave=True,
    autosave_use_timestamp=True,
)

# Saves to: $WORKSPACE_DIR/swarms/SwarmRouter/{swarm-name}-{timestamp}/
#   config.json    (on initialization)
#   state.json     (after each run)
#   metadata.json  (after each run)

Tell Every Agent About Every Other Agent

router = SwarmRouter(
    swarm_type="SequentialWorkflow",
    agents=agents,
    list_all_agents=True,
)

Methods

run(task=None, img=None, tasks=None, ...)

Execute the configured swarm with a single task (or a list of tasks, when the underlying swarm accepts one).
result = router.run(
    task="Analyze market trends",
    img=None,
)

__call__(task, img=None, imgs=None, ...)

The router is directly callable as a shortcut for run().
result = router("Analyze market trends")

batch_run(tasks, img=None, imgs=None, ...)

Process multiple tasks sequentially. Re-uses the cached underlying swarm.
tasks = ["Task 1", "Task 2", "Task 3"]
results = router.batch_run(tasks)

concurrent_run(...)

Run multiple tasks concurrently.

to_dict()

Inherited from SerializableMixin. Returns a JSON-friendly snapshot of the router configuration.

Use Cases

Strategy Comparison

strategies = [
    "SequentialWorkflow",
    "ConcurrentWorkflow",
    "MixtureOfAgents",
]

results = {}
for strategy in strategies:
    router = SwarmRouter(swarm_type=strategy, agents=agents)
    results[strategy] = router.run(task)

for strategy, result in results.items():
    print(f"\n{strategy}:\n{result}")

Dynamic Swarm Selection

def select_swarm_type(task_complexity: str) -> str:
    if task_complexity == "simple":
        return "SequentialWorkflow"
    elif task_complexity == "parallel":
        return "ConcurrentWorkflow"
    elif task_complexity == "complex":
        return "HierarchicalSwarm"
    return "MixtureOfAgents"

task = "Complex analysis required"
complexity = analyze_complexity(task)

router = SwarmRouter(
    swarm_type=select_swarm_type(complexity),
    agents=agents,
)

result = router.run(task)

Production Pipeline with Fallback

class ProductionSwarmRouter:
    def __init__(self, agents):
        self.router = SwarmRouter(
            swarm_type="HierarchicalSwarm",
            agents=agents,
            autosave=True,
            verbose=True,
        )

    def process(self, task):
        try:
            return self.router.run(task)
        except Exception as e:
            print(f"Error with HierarchicalSwarm: {e}")
            fallback = SwarmRouter(
                swarm_type="SequentialWorkflow",
                agents=self.router.agents,
            )
            return fallback.run(task)

Factory Pattern

The router maintains a per-instance factory dispatch table and a swarm cache:
# Internal layout
self._swarm_factory = {
    "SequentialWorkflow": self._create_sequential_workflow,
    "ConcurrentWorkflow": self._create_concurrent_workflow,
    "AgentRearrange":     self._create_agent_rearrange,
    "MixtureOfAgents":    self._create_mixture_of_agents,
    "HierarchicalSwarm":  self._create_hierarchical_swarm,
    "GroupChat":          self._create_group_chat,
    "HeavySwarm":         self._create_heavy_swarm,
    # ... one entry per SwarmType ...
    "auto":               self._create_auto,
}

# First run() builds, subsequent runs reuse:
swarm = self._swarm_cache.get(cache_key) or factory(*args, **kwargs)
self._swarm_cache[cache_key] = swarm

Reliability Checks

reliability_check() runs automatically during construction:
# Validates:
# - swarm_type is not None
# - swarm_type is a string
# - swarm_type is one of the valid SwarmType values
# - rearrange_flow is set when swarm_type="AgentRearrange"
# - max_loops > 0

try:
    router = SwarmRouter(
        swarm_type="InvalidType",
        agents=agents,
    )
except SwarmRouterConfigError as e:
    print(e)
    # Includes the offending value and the list of valid types

Error Handling

from swarms.structs.swarm_router import (
    SwarmRouterConfigError,
    SwarmRouterRunError,
)

try:
    result = router.run("Task")
except SwarmRouterRunError as e:
    print(f"Execution failed: {e}")
    # The exception body includes:
    # - The reason for failure
    # - A formatted traceback
    # - Troubleshooting hints
except SwarmRouterConfigError as e:
    print(f"Configuration error: {e}")
    # Invalid swarm_type, missing required params, etc.

Best Practices

Start simple: begin with SequentialWorkflow, then escalate to a heavier topology only when you can name the failure mode it fixes.
  1. Match swarm_type to the task — pipelines for known shapes, ensembles for quality, hierarchies for decomposition.
  2. Validate required params — e.g. AgentRearrange needs rearrange_flow; HeavySwarm honors the heavy_swarm_* knobs.
  3. Wrap run() in try/except — catch SwarmRouterRunError and SwarmRouterConfigError explicitly in production.
  4. Test with simple types first — confirm agents and tools work, then swap in heavier swarms.
  5. Enable autosave for production — durable config.json / state.json / metadata.json make incident analysis far cheaper.
Some swarm types have specific requirements: AgentRearrange requires rearrange_flow; MixtureOfAgents consumes the last agent in the list as the aggregator; DebateWithJudge consumes the third agent as the judge.

Configuration Reference

Complete example with the most common options:
router = SwarmRouter(
    id="my-swarm-123",
    name="Production-Swarm",
    description="Production multi-agent system",
    swarm_type="HierarchicalSwarm",
    agents=agents,
    max_loops=2,
    output_type="dict",
    autosave=True,
    autosave_use_timestamp=True,
    multi_agent_collab_prompt=True,
    list_all_agents=True,
    shared_memory_system=memory,
    telemetry_enabled=True,
    verbose=True,
)