> ## Documentation Index
> Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
> Use this file to discover all available pages before exploring further.

# Sequential Workflow

> Execute agents in a linear chain where each agent's output becomes the next agent's input

The `SequentialWorkflow` orchestrates multiple agents in a linear chain, where each agent processes the output from the previous agent. This creates a pipeline where tasks flow through a series of specialized agents.

## When to Use

* **Step-by-step processes**: Tasks with clear sequential dependencies
* **Data transformation pipelines**: Progressive refinement of outputs
* **Multi-stage analysis**: Research → Analysis → Writing → Review
* **Quality improvement**: Multiple rounds of refinement

## Key Features

* Automatic flow construction from agent list
* Support for multiple execution loops
* Shared memory between agents (optional)
* Team awareness capabilities
* Conversation history tracking
* Autosave functionality

## Basic Example

```python theme={null}
from swarms import Agent, SequentialWorkflow

# Create specialized agents
researcher = Agent(
    agent_name="Researcher",
    system_prompt="Research the given topic and provide detailed information.",
    model_name="gpt-5.4",
    max_loops=1,
)

writer = Agent(
    agent_name="Writer",
    system_prompt="Transform research into an engaging article.",
    model_name="gpt-5.4",
    max_loops=1,
)

editor = Agent(
    agent_name="Editor",
    system_prompt="Edit and polish the article for publication.",
    model_name="gpt-5.4",
    max_loops=1,
)

# Create workflow
workflow = SequentialWorkflow(
    agents=[researcher, writer, editor],
    max_loops=1,
)

# Execute
result = workflow.run("The impact of AI on healthcare")
print(result)
```

## Advanced Configuration

### With Team Awareness

```python theme={null}
workflow = SequentialWorkflow(
    name="ContentPipeline",
    description="Research, write, and edit content",
    agents=[researcher, writer, editor],
    max_loops=1,
    team_awareness=True,  # Agents know about each other
    multi_agent_collab_prompt=True,  # Add collaboration instructions
    output_type="dict",
    autosave=True,
    verbose=True,
)
```

### With Shared Memory

```python theme={null}
from swarms.memory.langtrace_memory import LangTrace

# Initialize shared memory
shared_memory = LangTrace()

workflow = SequentialWorkflow(
    agents=[researcher, writer, editor],
    shared_memory_system=shared_memory,
    max_loops=1,
)
```

## Key Parameters

<ParamField path="name" type="str" default="SequentialWorkflow">
  Identifier for the workflow instance
</ParamField>

<ParamField path="agents" type="List[Agent]" required>
  List of agents to execute sequentially
</ParamField>

<ParamField path="max_loops" type="int" default="1">
  Number of times to execute the complete workflow
</ParamField>

<ParamField path="output_type" type="OutputType" default="dict">
  Format for workflow output (dict, str, list, etc.)
</ParamField>

<ParamField path="team_awareness" type="bool" default="False">
  Enable agents to know about team structure and flow
</ParamField>

<ParamField path="multi_agent_collab_prompt" type="bool" default="False">
  Add collaboration instructions to agent prompts
</ParamField>

<ParamField path="autosave" type="bool" default="True">
  Automatically save conversation history
</ParamField>

## Methods

### run()

Execute the workflow with a task.

```python theme={null}
result = workflow.run(
    task="Analyze quantum computing trends",
    img=None,  # Optional image input
)
```

### run\_batched()

Process multiple tasks sequentially.

```python theme={null}
tasks = [
    "AI in healthcare",
    "AI in finance",
    "AI in education"
]

results = workflow.run_batched(tasks)
```

### run\_async()

Execute workflow asynchronously.

```python theme={null}
import asyncio

async def main():
    result = await workflow.run_async("Task description")
    return result

result = asyncio.run(main())
```

### run\_concurrent()

Run multiple tasks concurrently.

```python theme={null}
tasks = ["Task 1", "Task 2", "Task 3"]
results = workflow.run_concurrent(tasks)
```

### run\_stream() / arun\_stream()

Stream tokens from each agent in pipeline order, in real time. Each agent's tokens are yielded the moment the LLM produces them; once an agent finishes, its full output is handed off to the next agent — same hand-off as `run()`, just streamed.

```python theme={null}
# Sync generator
for token in workflow.run_stream("Analyse NVDA"):
    print(token, end="", flush=True)
```

```python theme={null}
# Async generator
import asyncio

async def main():
    async for token in workflow.arun_stream("Analyse NVDA"):
        print(token, end="", flush=True)

asyncio.run(main())
```

#### Structured events: `with_events=True`

By default both methods yield plain token strings. Pass `with_events=True` to receive structured event dicts instead — useful when you want to render per-agent panels, attribute tokens to their emitting agent, or know exactly when each agent starts and finishes.

```python theme={null}
async for evt in workflow.arun_stream(task, with_events=True):
    if evt["type"] == "agent_start":
        print(f"--- {evt['agent']} starting ---")
    elif evt["type"] == "token":
        print(evt["token"], end="", flush=True)
    elif evt["type"] == "agent_end":
        print(f"\n--- {evt['agent']} finished ({len(evt['output'])} chars) ---")
```

Three event types are emitted:

| Type          | Fields            | When                                                  |
| ------------- | ----------------- | ----------------------------------------------------- |
| `agent_start` | `agent`           | Right before an agent begins streaming                |
| `token`       | `agent`, `token`  | For every token emitted by the agent                  |
| `agent_end`   | `agent`, `output` | After the agent has finished, carries the full output |

<Note>
  `max_loops > 1` and `drift_detection` are not applied in streaming mode. Use `run()` if you need those.
</Note>

## Use Cases

### Content Creation Pipeline

```python theme={null}
# Research → Write → Edit → Publish
researcher = Agent(agent_name="Researcher", ...)
writer = Agent(agent_name="Writer", ...)
editor = Agent(agent_name="Editor", ...)
publisher = Agent(agent_name="Publisher", ...)

pipeline = SequentialWorkflow(
    agents=[researcher, writer, editor, publisher]
)

article = pipeline.run("Latest developments in renewable energy")
```

### Data Analysis Pipeline

```python theme={null}
# Collect → Clean → Analyze → Visualize
collector = Agent(agent_name="DataCollector", ...)
cleaner = Agent(agent_name="DataCleaner", ...)
analyst = Agent(agent_name="Analyst", ...)
visualizer = Agent(agent_name="Visualizer", ...)

analysis_pipeline = SequentialWorkflow(
    agents=[collector, cleaner, analyst, visualizer]
)

result = analysis_pipeline.run("Q4 sales data")
```

### Code Review Pipeline

```python theme={null}
# Generate → Test → Review → Document
coder = Agent(agent_name="Coder", ...)
tester = Agent(agent_name="Tester", ...)
reviewer = Agent(agent_name="Reviewer", ...)
documenter = Agent(agent_name="Documenter", ...)

code_pipeline = SequentialWorkflow(
    agents=[coder, tester, reviewer, documenter]
)

output = code_pipeline.run("Create a binary search function")
```

## Best Practices

<Note>
  **Tip**: Keep each agent focused on a single responsibility for cleaner workflows
</Note>

1. **Clear Agent Roles**: Each agent should have a distinct, well-defined purpose
2. **Appropriate Ordering**: Place agents in logical sequence (research before writing)
3. **Output Formatting**: Ensure each agent's output format matches the next agent's expected input
4. **Error Handling**: Monitor for failures in early stages to prevent cascade issues
5. **Performance**: Consider workflow length - too many agents can slow execution

## Advanced Features

### Custom Flow Generation

The workflow automatically generates a flow string:

```python theme={null}
workflow = SequentialWorkflow(
    agents=[agent1, agent2, agent3]
)

# Automatically generates: "agent1 -> agent2 -> agent3"
print(workflow.flow)
```

### Conversation Tracking

Access complete conversation history:

```python theme={null}
result = workflow.run("Task")

# Access conversation through internal agent_rearrange
history = workflow.agent_rearrange.conversation.conversation_history
```

<Warning>
  The workflow validates that `agents` is not empty and `max_loops > 0` on initialization
</Warning>

## Related Architectures

* [Concurrent Workflow](/architectures/concurrent-workflow) - For parallel execution
* [Agent Rearrange](/architectures/agent-rearrange) - For custom flows
* [Hierarchical Swarm](/architectures/hierarchical-swarm) - For complex orchestration
