> ## Documentation Index
> Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
> Use this file to discover all available pages before exploring further.

# PlannerWorkerSwarm

> A planner-worker-judge architecture for parallel multi-agent task execution with optimistic concurrency

## Overview

The `PlannerWorkerSwarm` implements a planner-worker-judge architecture for parallel multi-agent task execution. Based on Cursor's ["Scaling long-running autonomous coding"](https://cursor.com/blog/scaling-agents) research, it separates planning from execution: a planner decomposes goals into prioritized tasks, worker agents claim and execute tasks concurrently from a shared queue, and a judge evaluates the cycle results.

```mermaid theme={null}
graph TD
    A[User Task] --> B[Planner Agent]
    B --> C[TaskQueue]
    C --> D1[Worker 1]
    C --> D2[Worker 2]
    C --> D3[Worker N]
    D1 --> E[Results]
    D2 --> E
    D3 --> E
    E --> F[Judge Agent]
    F -->|Complete| G[Output]
    F -->|Gaps| H[Replan with feedback]
    F -->|Drift| I[Fresh Start]
    H --> B
    I --> B
```

The swarm follows a cycle-based workflow:

1. **Planning**: A planner agent decomposes the goal into concrete, prioritized tasks with dependencies
2. **Execution**: Worker agents independently claim tasks from a shared queue and execute them concurrently via `ThreadPoolExecutor` -- no worker-to-worker coordination
3. **Evaluation**: A judge agent evaluates the combined results and decides: complete, fill gaps, or fresh start
4. **Iteration**: If not complete, the planner receives judge feedback and produces new tasks for the next cycle

## Installation

```bash theme={null}
pip install -U swarms
```

## Attributes

<ParamField path="name" type="str" default="PlannerWorkerSwarm">
  Name identifier for this swarm instance
</ParamField>

<ParamField path="description" type="str" default="A planner-worker execution swarm">
  Description of the swarm's purpose
</ParamField>

<ParamField path="agents" type="List[Union[Agent, Callable]]" required>
  Worker agents that execute tasks. Must not be empty.
</ParamField>

<ParamField path="max_loops" type="int" default="1">
  Maximum planner-worker-judge cycles (must be greater than 0)
</ParamField>

<ParamField path="planner_model_name" type="str" default="gpt-5.4">
  Model for the planner agent
</ParamField>

<ParamField path="judge_model_name" type="str" default="gpt-5.4">
  Model for the judge agent
</ParamField>

<ParamField path="max_planner_depth" type="int" default="1">
  Max recursive sub-planner depth. `1` = no sub-planners; `2` = CRITICAL tasks are decomposed once.
</ParamField>

<ParamField path="worker_timeout" type="Optional[float]" default="None">
  Max seconds for the entire worker pool per cycle
</ParamField>

<ParamField path="task_timeout" type="Optional[float]" default="None">
  Max seconds per individual task execution
</ParamField>

<ParamField path="max_workers" type="Optional[int]" default="None">
  Max concurrent worker threads. Defaults to `min(len(agents), os.cpu_count())`.
</ParamField>

<ParamField path="output_type" type="OutputType" default="dict-all-except-first">
  Format for the final result
</ParamField>

<ParamField path="autosave" type="bool" default="False">
  Whether to save conversation history
</ParamField>

<ParamField path="verbose" type="bool" default="False">
  Enable verbose logging
</ParamField>

**Raises:**

| Exception    | Condition                                     |
| ------------ | --------------------------------------------- |
| `ValueError` | If no agents are provided or `max_loops <= 0` |

## Methods

### run()

Executes the planner-worker-judge cycle up to `max_loops` times or until the judge declares the goal complete.

```python theme={null}
def run(self, task: Optional[str] = None, img: Optional[str] = None) -> Any
```

**Parameters:**

* `task` (str): The goal to accomplish
* `img` (str, optional): Optional image input

**Returns:** Formatted conversation history per `output_type`

**Raises:**

* `ValueError`: If `task` is not provided

### get\_status()

Returns a structured status report of the swarm and its task queue.

```python theme={null}
def get_status(self) -> Dict[str, Any]
```

**Returns:** Status dict with `name`, `original_task`, and `queue` (containing `total`, `progress`, `status_counts`, and per-task details)

## Usage Examples

### Quick Start

```python theme={null}
from swarms import Agent
from swarms.structs.planner_worker_swarm import PlannerWorkerSwarm

swarm = PlannerWorkerSwarm(
    agents=[
        Agent(agent_name="Research", agent_description="Gathers information", model_name="gpt-5.4", max_loops=1),
        Agent(agent_name="Analysis", agent_description="Analyzes data", model_name="gpt-5.4", max_loops=1),
    ],
    max_loops=1,
    max_workers=2,
    worker_timeout=120,
)

result = swarm.run("What are the top 3 benefits of renewable energy?")
print(result)
```

### Multi-Cycle with Judge Feedback

Set `max_loops > 1` so the judge can request additional planning cycles when the goal is not yet achieved:

```python theme={null}
from swarms import Agent
from swarms.structs.planner_worker_swarm import PlannerWorkerSwarm

workers = [
    Agent(
        agent_name="Research-Agent",
        agent_description="Gathers factual information and data",
        model_name="gpt-5.4",
        max_loops=1,
    ),
    Agent(
        agent_name="Analysis-Agent",
        agent_description="Analyzes data and identifies patterns",
        model_name="gpt-5.4",
        max_loops=1,
    ),
]

swarm = PlannerWorkerSwarm(
    name="Research-Swarm",
    agents=workers,
    max_loops=3,           # up to 3 planner-worker-judge cycles
    max_workers=5,
    worker_timeout=120,
)

result = swarm.run(
    task="Produce a comprehensive market report on the EV industry "
    "covering manufacturers, technology trends, adoption challenges, "
    "regional differences, and a 5-year outlook."
)
print(result)
```

The judge evaluates each cycle:

* **Cycle 1**: Judge finds gaps ("missing regional analysis") -- planner creates targeted tasks
* **Cycle 2**: Judge finds remaining issues ("outlook section too shallow") -- planner fills gaps
* **Cycle 3**: Judge marks complete with quality 9/10

### Recursive Sub-Planners

Set `max_planner_depth > 1` to automatically decompose CRITICAL-priority tasks via sub-planner agents:

```python theme={null}
swarm = PlannerWorkerSwarm(
    agents=workers,
    max_planner_depth=2,   # CRITICAL tasks decomposed once
)

result = swarm.run(
    task="Design and implement a complete REST API for a task management system"
)
```

When the top-level planner produces a CRITICAL task (e.g., "Design the database schema and API endpoints"), it gets cancelled and replaced by the sub-planner's more granular subtasks.

### Timeouts

```python theme={null}
swarm = PlannerWorkerSwarm(
    agents=workers,
    worker_timeout=120,    # 2 min max for entire worker phase per cycle
    task_timeout=30,       # 30s max per individual task (detects stuck workers)
)
```

* `worker_timeout`: Total wall time for the worker pool per cycle. Workers stop claiming new tasks after this deadline.
* `task_timeout`: Per-task execution limit. If exceeded, the task fails with a `TimeoutError` and may be retried.

### Checking Swarm Status

```python theme={null}
status = swarm.get_status()
print(f"Progress: {status['queue']['progress']}")
for task in status["queue"]["tasks"]:
    print(f"  [{task['status']}] {task['title']} -> {task['assigned_worker']}")
```

### SwarmRouter Integration

`PlannerWorkerSwarm` is available as a swarm type in `SwarmRouter`:

```python theme={null}
from swarms import Agent
from swarms.structs.swarm_router import SwarmRouter

workers = [
    Agent(agent_name="W1", model_name="gpt-5.4", max_loops=1),
    Agent(agent_name="W2", model_name="gpt-5.4", max_loops=1),
]

router = SwarmRouter(agents=workers, swarm_type="PlannerWorkerSwarm")
result = router.run("Analyze the competitive landscape of cloud computing providers")
```

## Architecture

### Design Principles (from the Cursor blog)

| Cursor Principle                        | Implementation                                                                                                          |
| --------------------------------------- | ----------------------------------------------------------------------------------------------------------------------- |
| Planners plan, workers execute          | `_run_planner()` produces `PlannerTaskSpec`; workers get `WORKER_SYSTEM_PROMPT` enforcing "only execute, never plan"    |
| No worker-to-worker coordination        | Workers interact only with `TaskQueue.claim()` -- atomic, no shared state                                               |
| No locks / optimistic concurrency       | `TaskQueue` uses a version field per task; `claim()` is atomic under a minimal lock, but workers never block each other |
| Judge-driven cycles with fresh start    | `CycleVerdict.needs_fresh_start` triggers `TaskQueue.clear()` to combat accumulated drift                               |
| Prompts matter more than infrastructure | `WORKER_SYSTEM_PROMPT`, `PLANNER_SYSTEM_PROMPT`, and `JUDGE_SYSTEM_PROMPT` enforce strict role boundaries               |
| Horizontal scaling                      | `ThreadPoolExecutor(max_workers=N)` -- tested with 200 tasks across 100 threads, zero double-claims                     |

### Task State Machine

```
PENDING --> CLAIMED --> RUNNING --> COMPLETED
                          |
                          v
                        FAILED --> PENDING (retry)
                          |
                          v (retries exhausted)
                        FAILED (permanent)

Any non-terminal --> CANCELLED
```

### Cycle Flow

```
Cycle 1:
  Planner receives: original task
  Workers execute: tasks from queue
  Judge evaluates: complete? gaps? drift?

Cycle 2+ (if not complete):
  If fresh start: clear ALL tasks, planner starts from scratch with judge feedback
  If gap fill: clear non-terminal tasks, preserve completed results
  Planner receives: original task + judge feedback + identified gaps
  Workers execute: new tasks
  Judge re-evaluates
```

### How It Works

**Planner Agent**: Created internally each cycle. Uses structured output (`PlannerTaskSpec`) to produce a plan narrative and a list of concrete tasks with title, description, priority (0-3), and dependency titles. On subsequent cycles, the planner receives the judge's feedback (gaps + follow-up instructions) appended to the original task.

**Worker Execution**: Each worker runs in a `ThreadPoolExecutor` thread, independently claiming tasks from a shared `TaskQueue`. Workers never coordinate with each other. Each worker loop: claims a task, resets agent memory, builds context (`WORKER_SYSTEM_PROMPT` + task description + dependency results), executes via `agent.run()`, and marks the task complete or failed.

**Optimistic concurrency**: every task has a `version` field. State transitions check the expected version -- if another worker modified the task, the operation is rejected. This avoids lock-based coordination problems (deadlocks, forgotten releases).

**Claim priority**: highest priority first, then oldest first, with dependency satisfaction required.

**Judge Agent**: Created internally after workers complete. Evaluates the cycle results and produces a `CycleVerdict`:

| Field                    | Type            | Description                                                  |
| ------------------------ | --------------- | ------------------------------------------------------------ |
| `is_complete`            | `bool`          | Whether the goal has been fully achieved                     |
| `overall_quality`        | `int` (0-10)    | Quality score of the combined results                        |
| `summary`                | `str`           | Brief assessment of what was accomplished                    |
| `gaps`                   | `List[str]`     | Specific missing items or issues                             |
| `follow_up_instructions` | `Optional[str]` | Instructions for the planner if another cycle is needed      |
| `needs_fresh_start`      | `bool`          | Whether accumulated drift requires discarding all prior work |

### Fresh Start vs Gap Fill

|                      | Gap Fill (`needs_fresh_start=False`)                                            | Fresh Start (`needs_fresh_start=True`)                           |
| -------------------- | ------------------------------------------------------------------------------- | ---------------------------------------------------------------- |
| **When**             | Some tasks succeeded but gaps remain                                            | Systemic drift, contradictory results, fundamentally flawed plan |
| **What happens**     | Only non-terminal tasks are cleared; completed results are preserved as context | ALL tasks are discarded                                          |
| **Planner receives** | Original task + feedback + gaps                                                 | Original task + feedback (clean slate)                           |

## Schemas

### PlannerTask

Represents a single task in the shared queue.

| Field             | Type                | Default        | Description                                     |
| ----------------- | ------------------- | -------------- | ----------------------------------------------- |
| `id`              | `str`               | auto-generated | Unique task identifier (`ptask-{uuid}`)         |
| `title`           | `str`               | required       | Short, descriptive title                        |
| `description`     | `str`               | required       | Detailed description for the worker             |
| `priority`        | `TaskPriority`      | `NORMAL`       | `LOW(0)`, `NORMAL(1)`, `HIGH(2)`, `CRITICAL(3)` |
| `depends_on`      | `List[str]`         | `[]`           | Task IDs that must complete first               |
| `parent_task_id`  | `Optional[str]`     | `None`         | Parent task ID if decomposed by sub-planner     |
| `status`          | `PlannerTaskStatus` | `PENDING`      | Current status                                  |
| `assigned_worker` | `Optional[str]`     | `None`         | Name of the worker that claimed this task       |
| `result`          | `Optional[str]`     | `None`         | Execution result                                |
| `error`           | `Optional[str]`     | `None`         | Error message if failed                         |
| `retries`         | `int`               | `0`            | Retry attempts so far                           |
| `max_retries`     | `int`               | `2`            | Max retries before permanent failure            |
| `version`         | `int`               | `0`            | Optimistic concurrency counter                  |
| `created_at`      | `float`             | `time.time()`  | Unix timestamp                                  |
| `completed_at`    | `Optional[float]`   | `None`         | Completion timestamp                            |
| `metadata`        | `Dict`              | `{}`           | Arbitrary metadata                              |

### TaskPriority

| Value | Name       | Description                                                                         |
| ----- | ---------- | ----------------------------------------------------------------------------------- |
| 0     | `LOW`      | Background or optional tasks                                                        |
| 1     | `NORMAL`   | Standard priority (default)                                                         |
| 2     | `HIGH`     | Important tasks that should be prioritized                                          |
| 3     | `CRITICAL` | Must-do tasks; also triggers sub-planner decomposition when `max_planner_depth > 1` |

## Best Practices

| Best Practice             | Description                                                                                                          |
| ------------------------- | -------------------------------------------------------------------------------------------------------------------- |
| **Agent Specialization**  | Give each worker a specific expertise area (research, analysis, writing) so the planner can match tasks to strengths |
| **Agent Descriptions**    | Provide clear `agent_description` fields -- the planner uses these to understand what each worker can do             |
| **Single-Loop Workers**   | Set `max_loops=1` on worker agents -- the swarm's outer loop handles iteration                                       |
| **Model Selection**       | Use a capable model for the planner and judge (task decomposition and evaluation are harder than execution)          |
| **Reasonable max\_loops** | 1-3 cycles is typical; diminishing returns after that                                                                |
| **Timeouts**              | Always set `worker_timeout` in production to prevent runaway execution                                               |
| **Worker Count**          | `max_workers` should roughly match agent count -- more threads than agents won't help                                |
| **Dependencies**          | Use task dependencies when output from one task is needed as input to another                                        |
| **Fresh Start**           | Trust the judge's fresh start mechanism -- it's designed to combat the drift problem identified in Cursor's research |

## Error Handling

| Issue                             | Solution                                                                           |
| --------------------------------- | ---------------------------------------------------------------------------------- |
| No agents provided                | Pass at least one `Agent` in the `agents` list                                     |
| `max_loops <= 0`                  | Set `max_loops` to a positive integer                                              |
| No task provided                  | Pass a non-empty `task` string to `run()`                                          |
| Planner output parsing fails      | Check that `planner_model_name` supports structured output (function calling)      |
| Judge output parsing fails        | Defaults to `is_complete=False` with quality 0, so the cycle continues safely      |
| Worker stuck on a task            | Set `task_timeout` to detect and fail stuck tasks                                  |
| All workers idle but tasks remain | Tasks may be blocked on unmet dependencies -- check for circular dependency chains |

## Performance Considerations

| Consideration              | Description                                                                                                                                               |
| -------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Parallelism**            | The primary performance advantage -- N workers execute N tasks simultaneously instead of sequentially                                                     |
| **Planner/Judge Overhead** | Each cycle adds 2 LLM calls (planner + judge) on top of the worker calls. Use smaller/faster models for these roles                                       |
| **Sub-Planner Cost**       | `max_planner_depth > 1` adds one planner call per CRITICAL task. Only enable when tasks genuinely need decomposition                                      |
| **Memory Reset**           | Worker memory is reset between tasks, which adds a small overhead but prevents context pollution                                                          |
| **Task Timeout**           | Per-task timeout spawns a nested thread -- slight overhead, but essential for detecting stuck workers                                                     |
| **Queue Contention**       | The `TaskQueue` lock is held only during claim/transition operations (microseconds). Tested with 200 tasks across 100 threads with zero contention issues |

## Source Code

View the [source code on GitHub](https://github.com/kyegomez/swarms/blob/master/swarms/structs/planner_worker_swarm.py)
