> ## Documentation Index
> Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
> Use this file to discover all available pages before exploring further.

# SubagentRegistry

> Background async subagent execution with task tracking, retry policies, depth-limited recursion, and result aggregation

## Overview

`SubagentRegistry` runs agent tasks in the background on a `ThreadPoolExecutor`, tracking each as a `SubagentTask` with status, retries, depth, and parent linkage. Use it when one agent needs to fan out work to other agents — recursively if needed — and gather results later.

The module exports three symbols:

| Export             | Kind       | Purpose                                                  |
| ------------------ | ---------- | -------------------------------------------------------- |
| `SubagentRegistry` | class      | Spawns and tracks tasks, gathers results                 |
| `SubagentTask`     | dataclass  | Per-task record with status, result, retry info          |
| `TaskStatus`       | `str` enum | `PENDING`, `RUNNING`, `COMPLETED`, `FAILED`, `CANCELLED` |

## Installation

```bash theme={null}
pip install -U swarms
```

## TaskStatus

```python theme={null}
from swarms import TaskStatus

TaskStatus.PENDING     # spawned but not yet started
TaskStatus.RUNNING     # currently executing
TaskStatus.COMPLETED   # finished successfully
TaskStatus.FAILED      # exhausted retries
TaskStatus.CANCELLED   # cancelled before completion
```

Backed by `(str, Enum)`, so the values compare equal to plain strings (`TaskStatus.PENDING == "pending"`).

## SubagentTask

Dataclass describing a single in-flight or completed task. Populated by `SubagentRegistry.spawn()`.

<ParamField path="id" type="str">
  Unique task ID, e.g. `task-a1b2c3d4`.
</ParamField>

<ParamField path="agent" type="Any">
  The agent instance assigned to this task.
</ParamField>

<ParamField path="task_str" type="str">
  The prompt/task handed to `agent.run(...)`.
</ParamField>

<ParamField path="status" type="TaskStatus" default="TaskStatus.PENDING">
  Current execution status.
</ParamField>

<ParamField path="result" type="Any" default="None">
  Return value from the agent — set when `status == COMPLETED`.
</ParamField>

<ParamField path="error" type="Exception | None" default="None">
  Last exception raised — set when `status == FAILED`.
</ParamField>

<ParamField path="future" type="concurrent.futures.Future | None" default="None">
  Underlying `Future` returned by the thread pool.
</ParamField>

<ParamField path="parent_id" type="str | None" default="None">
  ID of the parent task that spawned this one, if any.
</ParamField>

<ParamField path="depth" type="int" default="0">
  Recursion depth — incremented when an agent spawns another subagent.
</ParamField>

<ParamField path="retries" type="int" default="0">
  Number of retries used so far.
</ParamField>

<ParamField path="max_retries" type="int" default="0">
  Retry budget for this task.
</ParamField>

<ParamField path="retry_on" type="List[Type[Exception]] | None" default="None">
  Whitelist of exception classes that trigger retries. `None` retries on any exception.
</ParamField>

<ParamField path="created_at" type="float">
  Unix timestamp when the task was spawned.
</ParamField>

<ParamField path="completed_at" type="float | None" default="None">
  Unix timestamp when the task entered a terminal state.
</ParamField>

## SubagentRegistry

### Constructor

```python theme={null}
from swarms import SubagentRegistry

registry = SubagentRegistry(max_depth=3, max_workers=None)
```

<ParamField path="max_depth" type="int" default="3">
  Maximum recursion depth. `spawn()` raises `ValueError` if `depth > max_depth`.
</ParamField>

<ParamField path="max_workers" type="int | None" default="None">
  Thread-pool size. `None` defers to `ThreadPoolExecutor`'s default.
</ParamField>

### Methods

#### spawn()

Submit an agent task to the pool. Returns the new task ID synchronously; the agent runs in the background.

```python theme={null}
def spawn(
    self,
    agent: Any,
    task: str,
    parent_id: Optional[str] = None,
    depth: int = 0,
    max_retries: int = 0,
    retry_on: Optional[List[Type[Exception]]] = None,
    fail_fast: bool = True,
) -> str
```

<ParamField path="agent" type="Any" required>
  Agent instance with a `.run(task)` method.
</ParamField>

<ParamField path="task" type="str" required>
  The prompt to run.
</ParamField>

<ParamField path="parent_id" type="str | None" default="None">
  Set when this task was spawned by another task. Used for tracking trees.
</ParamField>

<ParamField path="depth" type="int" default="0">
  Recursion depth. Spawning from inside another task increments this.
</ParamField>

<ParamField path="max_retries" type="int" default="0">
  Retry budget on failure.
</ParamField>

<ParamField path="retry_on" type="List[Type[Exception]] | None" default="None">
  Only retry on these exception types. `None` retries on any exception.
</ParamField>

<ParamField path="fail_fast" type="bool" default="True">
  When `True`, the underlying thread re-raises on final failure (the exception surfaces when you call `future.result()`). When `False`, the failure is captured on the `SubagentTask` and the thread returns `None`.
</ParamField>

**Raises:** `ValueError` if `depth > max_depth`.

#### get\_task()

Look up a `SubagentTask` by ID.

```python theme={null}
def get_task(self, task_id: str) -> SubagentTask
```

**Raises:** `KeyError` if the ID is not in the registry.

#### get\_results()

Return a `Dict[task_id, Any]` for every completed or failed task. Failed tasks map to their exception object.

```python theme={null}
def get_results(self) -> Dict[str, Any]
```

#### cancel()

Attempt to cancel a not-yet-started task. Returns `True` if the underlying `Future` accepted the cancellation.

```python theme={null}
def cancel(self, task_id: str) -> bool
```

#### gather()

Block until tasks complete and return a list of results.

```python theme={null}
def gather(
    self,
    strategy: str = "wait_all",
    timeout: Optional[float] = None,
) -> List[Any]
```

<ParamField path="strategy" type="&#x22;wait_all&#x22; | &#x22;wait_first&#x22;" default="&#x22;wait_all&#x22;">
  Wait policy. `"wait_all"` returns once every pending task settles; `"wait_first"` returns as soon as one does.
</ParamField>

<ParamField path="timeout" type="float | None" default="None">
  Max seconds to wait. `None` blocks indefinitely.
</ParamField>

**Returns:** Mixed list — successful tasks contribute their result, failed tasks contribute their exception object.

#### shutdown()

Tear down the thread pool without waiting for outstanding tasks.

```python theme={null}
def shutdown(self) -> None
```

#### tasks

Read-only property — snapshot of all known tasks as `Dict[task_id, SubagentTask]`.

```python theme={null}
@property
def tasks(self) -> Dict[str, SubagentTask]
```

## Usage Examples

### Fan Out, Gather, Map by Agent

```python theme={null}
from swarms import Agent, SubagentRegistry

researchers = [
    Agent(agent_name=f"Researcher-{i}", model_name="claude-sonnet-4-6", max_loops=1)
    for i in range(3)
]

registry = SubagentRegistry(max_workers=4)

topics = [
    "Recent advances in RAG retrieval",
    "Long-context LLM scaling laws",
    "Agent tool-use benchmarks",
]

task_ids = [
    registry.spawn(agent, topic)
    for agent, topic in zip(researchers, topics)
]

results = registry.gather(strategy="wait_all")
for tid in task_ids:
    print(registry.get_task(tid).status, registry.get_task(tid).result[:80])

registry.shutdown()
```

### Retries on Specific Exceptions

Retry only on transient network errors; surface anything else immediately.

```python theme={null}
import httpx
from swarms import Agent, SubagentRegistry

registry = SubagentRegistry()
agent = Agent(agent_name="Crawler", model_name="claude-sonnet-4-6", max_loops=1)

task_id = registry.spawn(
    agent=agent,
    task="Fetch and summarize https://example.com/whitepaper",
    max_retries=3,
    retry_on=[httpx.TimeoutException, httpx.NetworkError],
    fail_fast=False,
)

registry.gather(timeout=60)
task = registry.get_task(task_id)
print(task.status, task.retries, task.result or task.error)
```

### Take the First Successful Result

```python theme={null}
from swarms import Agent, SubagentRegistry

registry = SubagentRegistry()
candidates = [
    Agent(agent_name="GPT", model_name="gpt-4.1", max_loops=1),
    Agent(agent_name="Claude", model_name="claude-sonnet-4-6", max_loops=1),
    Agent(agent_name="Mistral", model_name="mistral-large", max_loops=1),
]

for agent in candidates:
    registry.spawn(agent, "Draft a one-sentence product tagline for a developer-tools startup")

first_results = registry.gather(strategy="wait_first", timeout=30)
print(first_results[0])
```

### Depth-Limited Recursion

```python theme={null}
from swarms import Agent, SubagentRegistry

registry = SubagentRegistry(max_depth=2)
root = Agent(agent_name="Root", model_name="claude-sonnet-4-6", max_loops=1)

root_id = registry.spawn(root, "Plan a research project", depth=0)

# From inside a tool/handler the root agent can do:
# child_id = registry.spawn(child_agent, "Investigate sub-topic", parent_id=root_id, depth=1)
# Spawning at depth=3 would raise ValueError because max_depth=2.
```

## Source Code

View the [source on GitHub](https://github.com/kyegomez/swarms/blob/master/swarms/structs/async_subagent.py).
