> ## Documentation Index
> Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
> Use this file to discover all available pages before exploring further.

# SequentialWorkflow

> A workflow system for executing agents in a sequential, ordered chain

## Overview

The `SequentialWorkflow` class orchestrates the execution of a sequence of agents in a defined workflow. This class enables the construction and execution of a workflow where multiple agents (or callables) are executed in a specified order, passing tasks and optional data through the chain.

## Key Features

* **Sequential Execution**: Agents execute one after another in a defined order
* **Synchronous and Asynchronous Support**: Both blocking and non-blocking execution modes
* **Batched Processing**: Execute multiple tasks through the same workflow
* **Concurrent Task Processing**: Run multiple tasks concurrently using thread pools
* **Team Awareness**: Agents can be aware of their position in the team structure
* **Auto-save Support**: Automatically save conversation history to workspace
* **Flexible Output Types**: Support for various output formats (dict, list, string)

## Installation

```bash theme={null}
pip install -U swarms
```

## Class Definition

```python theme={null}
class SequentialWorkflow:
    def __init__(
        self,
        id: str = "sequential_workflow",
        name: str = "SequentialWorkflow",
        description: str = "Sequential Workflow, where agents are executed in a sequence.",
        agents: List[Union[Agent, Callable]] = None,
        max_loops: int = 1,
        output_type: OutputType = "dict",
        shared_memory_system: callable = None,
        multi_agent_collab_prompt: bool = False,
        team_awareness: bool = False,
        autosave: bool = True,
        verbose: bool = False,
        drift_detection: bool = False,
        drift_threshold: float = 0.75,
        drift_model: str = "claude-sonnet-4-5",
        *args,
        **kwargs,
    )
```

## Parameters

<ParamField path="id" type="str" default="sequential_workflow">
  Unique identifier for the workflow instance
</ParamField>

<ParamField path="name" type="str" default="SequentialWorkflow">
  Human-readable name for the workflow
</ParamField>

<ParamField path="description" type="str" default="Sequential Workflow, where agents are executed in a sequence.">
  Description of the workflow's purpose
</ParamField>

<ParamField path="agents" type="List[Union[Agent, Callable]]" required>
  List of agents or callables to execute in sequence. Cannot be None or empty.
</ParamField>

<ParamField path="max_loops" type="int" default="1">
  Maximum number of times to execute the workflow. Must be greater than 0.
</ParamField>

<ParamField path="output_type" type="OutputType" default="dict">
  Format of the output from the workflow. Options include "dict", "list", "str", etc.
</ParamField>

<ParamField path="shared_memory_system" type="callable" default="None">
  Optional callable for managing shared memory between agents
</ParamField>

<ParamField path="multi_agent_collab_prompt" type="bool" default="False">
  If True, appends a collaborative prompt to each agent's system prompt
</ParamField>

<ParamField path="team_awareness" type="bool" default="False">
  Whether agents are aware of the team structure and their position
</ParamField>

<ParamField path="autosave" type="bool" default="True">
  Whether to enable autosaving of conversation history to workspace
</ParamField>

<ParamField path="verbose" type="bool" default="False">
  Whether to enable verbose logging
</ParamField>

<ParamField path="drift_detection" type="bool" default="False">
  If `True`, a judge agent scores the final output's semantic alignment with the original task after the pipeline completes. If the score falls below `drift_threshold`, the pipeline reruns and the cycle repeats until the score meets the threshold. If the judge output cannot be parsed, drift checking is skipped and the last result is returned as-is.
</ParamField>

<ParamField path="drift_threshold" type="float" default="0.75">
  Minimum alignment score (0.0–1.0) to consider the output acceptable. A warning is logged when the score falls below this value, and the pipeline reruns. Used only when `drift_detection=True`.
</ParamField>

<ParamField path="drift_model" type="str" default="claude-sonnet-4-5">
  Model used by the drift detection judge agent. Used only when `drift_detection=True`.
</ParamField>

## Methods

### `run(task, img=None, imgs=None, *args, **kwargs)`

Executes a specified task through the agents in the dynamically constructed flow.

If `drift_detection` is configured, a judge agent scores the final output's semantic alignment with the original task after the pipeline completes. If the score falls below `drift_threshold`, the pipeline reruns and the cycle repeats until the score meets the threshold.

<ParamField path="task" type="str" required>
  The task for the agents to execute
</ParamField>

<ParamField path="img" type="Optional[str]" default="None">
  An optional image input forwarded to every agent that supports it
</ParamField>

<ParamField path="imgs" type="Optional[List[str]]" default="None">
  Optional list of images. Accepted for API compatibility; only `img` is currently forwarded to the underlying flow.
</ParamField>

<ResponseField name="result" type="str | dict | list">
  The final result, formatted per `output_type`
</ResponseField>

### `run_batched(tasks)`

Executes a batch of tasks through the agents sequentially.

<ParamField path="tasks" type="List[str]" required>
  A list of tasks for the agents to execute. Must be a non-empty list of strings.
</ParamField>

<ResponseField name="results" type="List[str]">
  A list of final results after processing through all agents
</ResponseField>

### `run_async(task)`

Executes the specified task through the agents asynchronously.

<ParamField path="task" type="str" required>
  The task for the agents to execute. Must be a non-empty string.
</ParamField>

<ResponseField name="result" type="str">
  The final result after processing through all agents
</ResponseField>

### `run_concurrent(tasks)`

Executes a batch of tasks through the agents concurrently using ThreadPoolExecutor.

<ParamField path="tasks" type="List[str]" required>
  A list of tasks for the agents to execute. Must be a non-empty list of strings.
</ParamField>

<ResponseField name="results" type="List[str]">
  A list of final results after processing through all agents
</ResponseField>

### `run_stream(task, img=None, with_events=False, **kwargs)`

Stream tokens from each agent in pipeline order, in real time. Each agent's tokens are yielded the moment the LLM produces them; once an agent finishes, its full output is handed off to the next agent — the same hand-off as `run()`, just streamed.

Delegates to `AgentRearrange.run_stream` internally.

<ParamField path="task" type="str" required>
  Initial task passed to the first agent in the pipeline.
</ParamField>

<ParamField path="img" type="Optional[str]" default="None">
  Optional image input forwarded to every agent.
</ParamField>

<ParamField path="with_events" type="bool" default="False">
  When `False`, yield plain token strings. When `True`, yield structured event dicts (`agent_start`, `token`, `agent_end`) — useful for per-agent UI panels or attributing each token to its emitting agent.
</ParamField>

<ResponseField name="generator" type="Iterator[str | dict]">
  Sync generator. Yields plain token strings by default, or event dicts when `with_events=True`.
</ResponseField>

```python theme={null}
for token in workflow.run_stream("Analyse NVDA"):
    print(token, end="", flush=True)
```

### `arun_stream(task, img=None, with_events=False, **kwargs)`

Async generator version of `run_stream`. Drop-in for any async caller (FastAPI's `StreamingResponse`, async background workers, etc.). Same parameter and yield semantics as the sync version.

<ResponseField name="async_generator" type="AsyncIterator[str | dict]">
  Async generator. Yields plain token strings by default, or event dicts when `with_events=True`.
</ResponseField>

```python theme={null}
async for token in workflow.arun_stream("Analyse NVDA"):
    print(token, end="", flush=True)
```

#### Structured events

When called with `with_events=True`, both methods yield three event types:

| Type          | Fields            | When emitted                                      |
| ------------- | ----------------- | ------------------------------------------------- |
| `agent_start` | `agent`           | Right before an agent begins streaming            |
| `token`       | `agent`, `token`  | For every token the agent emits                   |
| `agent_end`   | `agent`, `output` | After the agent finishes; carries the full output |

```python theme={null}
async for evt in workflow.arun_stream(task, with_events=True):
    if evt["type"] == "agent_start":
        print(f"--- {evt['agent']} starting ---")
    elif evt["type"] == "token":
        print(evt["token"], end="", flush=True)
    elif evt["type"] == "agent_end":
        print(f"\n--- {evt['agent']} finished ({len(evt['output'])} chars) ---")
```

<Note>
  `max_loops > 1` and `drift_detection` are not applied in streaming mode. Use `run()` if you need those.
</Note>

### `sequential_flow()`

Constructs a string representation of the agent execution order.

<ResponseField name="flow" type="str">
  A string showing the order of agent execution (e.g., "AgentA -> AgentB -> AgentC")
</ResponseField>

### `reliability_check()`

Validates the workflow configuration and prepares agents for execution.

**Raises:**

* `ValueError`: If the agents list is None or empty, or if max\_loops is set to 0

## Attributes

| Attribute                   | Type                           | Description                                                             |
| --------------------------- | ------------------------------ | ----------------------------------------------------------------------- |
| `id`                        | str                            | Unique identifier for the workflow instance                             |
| `name`                      | str                            | Human-readable name for the workflow                                    |
| `description`               | str                            | Description of the workflow's purpose                                   |
| `agents`                    | List\[Union\[Agent, Callable]] | List of agents or callables to execute in sequence                      |
| `max_loops`                 | int                            | Maximum number of times to execute the workflow                         |
| `output_type`               | OutputType                     | Format of the output from the workflow                                  |
| `shared_memory_system`      | callable                       | Optional shared memory helper                                           |
| `multi_agent_collab_prompt` | bool                           | Whether the collaboration prompt is appended to each agent              |
| `team_awareness`            | bool                           | Whether team-awareness is enabled on the underlying `AgentRearrange`    |
| `autosave`                  | bool                           | Whether conversation history is autosaved to disk                       |
| `verbose`                   | bool                           | Whether verbose logging is on                                           |
| `drift_threshold`           | float                          | Minimum alignment score required when `drift_detection=True`            |
| `drift_agent`               | Optional\[Agent]               | Judge agent used for drift detection; `None` when disabled              |
| `swarm_workspace_dir`       | Optional\[str]                 | Workspace directory used for autosave; populated by `_setup_autosave()` |
| `flow`                      | str                            | String representation of the agent execution order                      |
| `agent_rearrange`           | AgentRearrange                 | Internal helper for managing agent execution                            |

## Usage Examples

### Basic Sequential Workflow

```python theme={null}
from swarms import Agent, SequentialWorkflow
from swarms.models import OpenAIChat

# Initialize the language model
llm = OpenAIChat(
    model_name="claude-sonnet-4-6",
    temperature=0.5,
)

# Create specialized agents
researcher = Agent(
    agent_name="Researcher",
    llm=llm,
    max_loops=1,
    system_prompt="You are a research specialist. Gather and analyze information."
)

writer = Agent(
    agent_name="Writer",
    llm=llm,
    max_loops=1,
    system_prompt="You are a content writer. Create engaging content based on research."
)

editor = Agent(
    agent_name="Editor",
    llm=llm,
    max_loops=1,
    system_prompt="You are an editor. Review and polish the content."
)

# Create sequential workflow
workflow = SequentialWorkflow(
    name="Content Creation Pipeline",
    description="Research -> Write -> Edit pipeline",
    agents=[researcher, writer, editor],
    max_loops=1,
    verbose=True
)

# Run the workflow
result = workflow.run("Create a blog post about artificial intelligence")
print(result)
```

### Batched Task Processing

```python theme={null}
# Process multiple tasks through the same workflow
tasks = [
    "Analyze the impact of AI on healthcare",
    "Discuss the future of renewable energy",
    "Examine cybersecurity trends"
]

results = workflow.run_batched(tasks)

for task, result in zip(tasks, results):
    print(f"Task: {task}")
    print(f"Result: {result}")
    print("-" * 50)
```

### Concurrent Task Execution

```python theme={null}
import asyncio

# Run multiple tasks concurrently
tasks = [
    "Research quantum computing",
    "Research blockchain technology",
    "Research machine learning"
]

results = workflow.run_concurrent(tasks)
print(f"Processed {len(results)} tasks concurrently")
```

### Async Execution

```python theme={null}
import asyncio

async def process_task():
    result = await workflow.run_async("Analyze climate change solutions")
    return result

# Run async
result = asyncio.run(process_task())
print(result)
```

### With Team Awareness

```python theme={null}
# Create workflow with team awareness
workflow = SequentialWorkflow(
    name="Collaborative Pipeline",
    agents=[researcher, writer, editor],
    team_awareness=True,  # Agents know about each other
    multi_agent_collab_prompt=True,  # Add collaboration prompt
    verbose=True
)

result = workflow.run("Create a comprehensive market analysis")
```

### Custom Output Types

```python theme={null}
# Different output formats
workflow_dict = SequentialWorkflow(
    agents=[researcher, writer],
    output_type="dict"  # Returns dict format
)

workflow_list = SequentialWorkflow(
    agents=[researcher, writer],
    output_type="list"  # Returns list format
)

workflow_str = SequentialWorkflow(
    agents=[researcher, writer],
    output_type="str"  # Returns string format
)
```

### With Autosave

```python theme={null}
import os

# Set workspace directory for autosave
os.environ["WORKSPACE_DIR"] = "./my_workspace"

workflow = SequentialWorkflow(
    name="SavedWorkflow",
    agents=[researcher, writer, editor],
    autosave=True,  # Automatically save conversation history
    verbose=True
)

result = workflow.run("Generate a report")
# Conversation history saved to ./my_workspace/swarms/SequentialWorkflow/
```

## Error Handling

```python theme={null}
try:
    workflow = SequentialWorkflow(
        agents=[researcher, writer],
        max_loops=1
    )
    result = workflow.run("Process this task")
except ValueError as e:
    print(f"Configuration error: {e}")
except Exception as e:
    print(f"Execution error: {e}")
```

## Best Practices

1. **Agent Design**: Ensure each agent has a clear, specific role in the sequence
2. **Error Handling**: Use try-except blocks around workflow execution
3. **Verbose Mode**: Enable verbose logging during development for debugging
4. **Autosave**: Enable autosave for important workflows to preserve conversation history
5. **Task Clarity**: Provide clear, specific tasks to get the best results
6. **Resource Management**: Use `run_concurrent` for I/O-bound tasks to improve performance
7. **Team Awareness**: Enable team awareness for complex workflows where context matters

## Common Use Cases

* **Content Creation Pipelines**: Research -> Write -> Edit workflows
* **Data Processing**: Extract -> Transform -> Load (ETL) pipelines
* **Analysis Workflows**: Collect -> Analyze -> Report sequences
* **Quality Assurance**: Create -> Review -> Approve chains
* **Multi-Stage Reasoning**: Break complex problems into sequential steps

## Related Classes

* [AgentRearrange](/api/agent-rearrange): For more complex, non-linear agent orchestration
* [ConcurrentWorkflow](/api/concurrent-workflow): For parallel agent execution
* [GraphWorkflow](/api/graph-workflow): For DAG-based agent workflows
* [Agent](/api/agent): The base agent class used in workflows
