Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.swarms.world/llms.txt

Use this file to discover all available pages before exploring further.

Overview

SubagentRegistry runs agent tasks in the background on a ThreadPoolExecutor, tracking each as a SubagentTask with status, retries, depth, and parent linkage. Use it when one agent needs to fan out work to other agents — recursively if needed — and gather results later. The module exports three symbols:
ExportKindPurpose
SubagentRegistryclassSpawns and tracks tasks, gathers results
SubagentTaskdataclassPer-task record with status, result, retry info
TaskStatusstr enumPENDING, RUNNING, COMPLETED, FAILED, CANCELLED

Installation

pip install -U swarms

TaskStatus

from swarms import TaskStatus

TaskStatus.PENDING     # spawned but not yet started
TaskStatus.RUNNING     # currently executing
TaskStatus.COMPLETED   # finished successfully
TaskStatus.FAILED      # exhausted retries
TaskStatus.CANCELLED   # cancelled before completion
Backed by (str, Enum), so the values compare equal to plain strings (TaskStatus.PENDING == "pending").

SubagentTask

Dataclass describing a single in-flight or completed task. Populated by SubagentRegistry.spawn().
id
str
Unique task ID, e.g. task-a1b2c3d4.
agent
Any
The agent instance assigned to this task.
task_str
str
The prompt/task handed to agent.run(...).
status
TaskStatus
default:"TaskStatus.PENDING"
Current execution status.
result
Any
default:"None"
Return value from the agent — set when status == COMPLETED.
error
Exception | None
default:"None"
Last exception raised — set when status == FAILED.
future
concurrent.futures.Future | None
default:"None"
Underlying Future returned by the thread pool.
parent_id
str | None
default:"None"
ID of the parent task that spawned this one, if any.
depth
int
default:"0"
Recursion depth — incremented when an agent spawns another subagent.
retries
int
default:"0"
Number of retries used so far.
max_retries
int
default:"0"
Retry budget for this task.
retry_on
List[Type[Exception]] | None
default:"None"
Whitelist of exception classes that trigger retries. None retries on any exception.
created_at
float
Unix timestamp when the task was spawned.
completed_at
float | None
default:"None"
Unix timestamp when the task entered a terminal state.

SubagentRegistry

Constructor

from swarms import SubagentRegistry

registry = SubagentRegistry(max_depth=3, max_workers=None)
max_depth
int
default:"3"
Maximum recursion depth. spawn() raises ValueError if depth > max_depth.
max_workers
int | None
default:"None"
Thread-pool size. None defers to ThreadPoolExecutor’s default.

Methods

spawn()

Submit an agent task to the pool. Returns the new task ID synchronously; the agent runs in the background.
def spawn(
    self,
    agent: Any,
    task: str,
    parent_id: Optional[str] = None,
    depth: int = 0,
    max_retries: int = 0,
    retry_on: Optional[List[Type[Exception]]] = None,
    fail_fast: bool = True,
) -> str
agent
Any
required
Agent instance with a .run(task) method.
task
str
required
The prompt to run.
parent_id
str | None
default:"None"
Set when this task was spawned by another task. Used for tracking trees.
depth
int
default:"0"
Recursion depth. Spawning from inside another task increments this.
max_retries
int
default:"0"
Retry budget on failure.
retry_on
List[Type[Exception]] | None
default:"None"
Only retry on these exception types. None retries on any exception.
fail_fast
bool
default:"True"
When True, the underlying thread re-raises on final failure (the exception surfaces when you call future.result()). When False, the failure is captured on the SubagentTask and the thread returns None.
Raises: ValueError if depth > max_depth.

get_task()

Look up a SubagentTask by ID.
def get_task(self, task_id: str) -> SubagentTask
Raises: KeyError if the ID is not in the registry.

get_results()

Return a Dict[task_id, Any] for every completed or failed task. Failed tasks map to their exception object.
def get_results(self) -> Dict[str, Any]

cancel()

Attempt to cancel a not-yet-started task. Returns True if the underlying Future accepted the cancellation.
def cancel(self, task_id: str) -> bool

gather()

Block until tasks complete and return a list of results.
def gather(
    self,
    strategy: str = "wait_all",
    timeout: Optional[float] = None,
) -> List[Any]
strategy
"wait_all" | "wait_first"
default:"\"wait_all\""
Wait policy. "wait_all" returns once every pending task settles; "wait_first" returns as soon as one does.
timeout
float | None
default:"None"
Max seconds to wait. None blocks indefinitely.
Returns: Mixed list — successful tasks contribute their result, failed tasks contribute their exception object.

shutdown()

Tear down the thread pool without waiting for outstanding tasks.
def shutdown(self) -> None

tasks

Read-only property — snapshot of all known tasks as Dict[task_id, SubagentTask].
@property
def tasks(self) -> Dict[str, SubagentTask]

Usage Examples

Fan Out, Gather, Map by Agent

from swarms import Agent, SubagentRegistry

researchers = [
    Agent(agent_name=f"Researcher-{i}", model_name="claude-sonnet-4-6", max_loops=1)
    for i in range(3)
]

registry = SubagentRegistry(max_workers=4)

topics = [
    "Recent advances in RAG retrieval",
    "Long-context LLM scaling laws",
    "Agent tool-use benchmarks",
]

task_ids = [
    registry.spawn(agent, topic)
    for agent, topic in zip(researchers, topics)
]

results = registry.gather(strategy="wait_all")
for tid in task_ids:
    print(registry.get_task(tid).status, registry.get_task(tid).result[:80])

registry.shutdown()

Retries on Specific Exceptions

Retry only on transient network errors; surface anything else immediately.
import httpx
from swarms import Agent, SubagentRegistry

registry = SubagentRegistry()
agent = Agent(agent_name="Crawler", model_name="claude-sonnet-4-6", max_loops=1)

task_id = registry.spawn(
    agent=agent,
    task="Fetch and summarize https://example.com/whitepaper",
    max_retries=3,
    retry_on=[httpx.TimeoutException, httpx.NetworkError],
    fail_fast=False,
)

registry.gather(timeout=60)
task = registry.get_task(task_id)
print(task.status, task.retries, task.result or task.error)

Take the First Successful Result

from swarms import Agent, SubagentRegistry

registry = SubagentRegistry()
candidates = [
    Agent(agent_name="GPT", model_name="gpt-4.1", max_loops=1),
    Agent(agent_name="Claude", model_name="claude-sonnet-4-6", max_loops=1),
    Agent(agent_name="Mistral", model_name="mistral-large", max_loops=1),
]

for agent in candidates:
    registry.spawn(agent, "Draft a one-sentence product tagline for a developer-tools startup")

first_results = registry.gather(strategy="wait_first", timeout=30)
print(first_results[0])

Depth-Limited Recursion

from swarms import Agent, SubagentRegistry

registry = SubagentRegistry(max_depth=2)
root = Agent(agent_name="Root", model_name="claude-sonnet-4-6", max_loops=1)

root_id = registry.spawn(root, "Plan a research project", depth=0)

# From inside a tool/handler the root agent can do:
# child_id = registry.spawn(child_agent, "Investigate sub-topic", parent_id=root_id, depth=1)
# Spawning at depth=3 would raise ValueError because max_depth=2.

Source Code

View the source on GitHub.