> ## Documentation Index
> Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
> Use this file to discover all available pages before exploring further.

# Graph Workflow

> Orchestrate complex agent workflows using Directed Acyclic Graphs (DAGs) with parallel and sequential execution

The `GraphWorkflow` orchestrates agents using a Directed Acyclic Graph (DAG) structure, enabling complex workflows with dependencies, parallel branches, and convergence points. Ideal for sophisticated pipelines with intricate task relationships.

## When to Use

* **Complex dependencies**: Tasks with intricate dependency graphs
* **Parallel branches**: Multiple independent paths that converge
* **Pipeline optimization**: Maximize parallelism while respecting dependencies
* **Software builds**: Compile, test, and deploy pipelines
* **Data processing**: ETL workflows with multiple stages

## Key Features

* DAG-based execution with topological sorting
* Automatic parallelization of independent nodes
* Support for NetworkX and Rustworkx backends
* Fan-out and fan-in patterns
* Entry and exit point management
* Graph visualization (with Graphviz)
* Auto-compilation for performance
* Graph validation and cycle detection

## Basic Example

```python theme={null}
from swarms import Agent
from swarms.structs.graph_workflow import GraphWorkflow, Node, Edge

# Define agents
data_collector = Agent(
    agent_name="DataCollector",
    system_prompt="Collect and validate data from sources.",
    model_name="gpt-5.4",
)

processor = Agent(
    agent_name="Processor",
    system_prompt="Process and clean collected data.",
    model_name="gpt-5.4",
)

analyzer = Agent(
    agent_name="Analyzer",
    system_prompt="Analyze processed data for insights.",
    model_name="gpt-5.4",
)

# Create graph workflow
workflow = GraphWorkflow(
    name="Data-Pipeline",
    description="ETL workflow for data processing",
)

# Add agents as nodes
workflow.add_node(data_collector)
workflow.add_node(processor)
workflow.add_node(analyzer)

# Define dependencies
workflow.add_edge("DataCollector", "Processor")  # Collector -> Processor
workflow.add_edge("Processor", "Analyzer")       # Processor -> Analyzer

# Compile and run
workflow.compile()
result = workflow.run("Process Q4 sales data")
print(result)
```

## Creating from Spec

Simplified workflow creation:

```python theme={null}
from swarms.structs.graph_workflow import GraphWorkflow

# Define agents
agents = [collector, processor, analyzer, reporter]

# Define edges (dependencies)
edges = [
    ("DataCollector", "Processor"),
    ("Processor", "Analyzer"),
    ("Analyzer", "Reporter"),
]

# Create workflow
workflow = GraphWorkflow.from_spec(
    agents=agents,
    edges=edges,
    task="Process data",
)

result = workflow.run()
```

## Advanced Edge Patterns

### Fan-Out Pattern

One node distributes to multiple nodes:

```python theme={null}
# Single edge to multiple targets
workflow.add_edges_from_source(
    source="DataCollector",
    targets=["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"]
)

# Equivalent to:
# DataCollector -> TechnicalAnalyst
# DataCollector -> FundamentalAnalyst
# DataCollector -> SentimentAnalyst
# (All three analysts run in parallel)
```

### Fan-In Pattern

Multiple nodes converge to one:

```python theme={null}
# Multiple sources to single target
workflow.add_edges_to_target(
    sources=["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"],
    target="SynthesisAgent"
)

# Equivalent to:
# TechnicalAnalyst -> SynthesisAgent
# FundamentalAnalyst -> SynthesisAgent
# SentimentAnalyst -> SynthesisAgent
# (Synthesis waits for all three)
```

### Parallel Chain Pattern

Full mesh connection:

```python theme={null}
# Multiple sources to multiple targets
workflow.add_parallel_chain(
    sources=["Collector1", "Collector2"],
    targets=["Analyst1", "Analyst2", "Analyst3"]
)

# Creates:
# Collector1 -> Analyst1, Analyst2, Analyst3
# Collector2 -> Analyst1, Analyst2, Analyst3
```

### Tuple-Based Patterns

Simplified edge definitions:

```python theme={null}
edges = [
    # Simple edge
    ("agent1", "agent2"),
    
    # Fan-out
    ("agent1", ["agent2", "agent3"]),
    
    # Fan-in
    (["agent1", "agent2"], "agent3"),
    
    # Parallel chain
    (["agent1", "agent2"], ["agent3", "agent4"]),
]

workflow = GraphWorkflow.from_spec(
    agents=agents,
    edges=edges,
)
```

## Key Parameters

<ParamField path="name" type="str">
  Name for the workflow
</ParamField>

<ParamField path="description" type="str">
  Description of the workflow's purpose
</ParamField>

<ParamField path="nodes" type="Dict[str, Node]">
  Dictionary of nodes (agents)
</ParamField>

<ParamField path="edges" type="List[Edge]">
  List of edges (dependencies)
</ParamField>

<ParamField path="entry_points" type="List[str]">
  Node IDs with no predecessors (auto-detected if not set)
</ParamField>

<ParamField path="end_points" type="List[str]">
  Node IDs with no successors (auto-detected if not set)
</ParamField>

<ParamField path="max_loops" type="int" default="1">
  Maximum execution loops
</ParamField>

<ParamField path="auto_compile" type="bool" default="True">
  Automatically compile on initialization
</ParamField>

<ParamField path="backend" type="str" default="networkx">
  Graph backend ("networkx" or "rustworkx")
</ParamField>

<ParamField path="verbose" type="bool" default="False">
  Enable verbose logging
</ParamField>

## Methods

### add\_node()

Add an agent to the graph:

```python theme={null}
workflow.add_node(
    agent=my_agent,
    metadata={"priority": "high"}  # Optional metadata
)
```

### add\_nodes()

Add multiple agents concurrently:

```python theme={null}
workflow.add_nodes(
    agents=[agent1, agent2, agent3],
    batch_size=10,
)
```

### add\_edge()

Add a dependency between nodes:

```python theme={null}
# Using agent objects
workflow.add_edge(source_agent, target_agent)

# Using node IDs
workflow.add_edge("Agent1", "Agent2")

# Using Edge object
from swarms.structs.graph_workflow import Edge
edge = Edge(source="Agent1", target="Agent2", metadata={"type": "data"})
workflow.add_edge(edge)
```

### compile()

Pre-compute expensive operations:

```python theme={null}
# Manual compilation
workflow.compile()

# Auto-compilation (default)
workflow = GraphWorkflow(auto_compile=True)
```

Compilation:

* Auto-sets entry/exit points
* Computes topological layers
* Caches for performance
* Validates graph structure

### run()

Execute the workflow:

```python theme={null}
result = workflow.run(
    task="Process data pipeline",
    img=None,  # Optional image input
)
```

## Use Cases

### Software Build Pipeline

```python theme={null}
compile_agent = Agent(agent_name="Compiler", ...)
test_agent = Agent(agent_name="Tester", ...)
lint_agent = Agent(agent_name="Linter", ...)
security_agent = Agent(agent_name="SecurityScanner", ...)
package_agent = Agent(agent_name="Packager", ...)
deploy_agent = Agent(agent_name="Deployer", ...)

build_pipeline = GraphWorkflow(name="CI-CD-Pipeline")

# Add all nodes
for agent in [compile_agent, test_agent, lint_agent, security_agent, package_agent, deploy_agent]:
    build_pipeline.add_node(agent)

# Define dependencies
build_pipeline.add_edge("Compiler", "Tester")
build_pipeline.add_edges_from_source("Compiler", ["Linter", "SecurityScanner"])
build_pipeline.add_edges_to_target(["Tester", "Linter", "SecurityScanner"], "Packager")
build_pipeline.add_edge("Packager", "Deployer")

build_pipeline.compile()
result = build_pipeline.run("Build and deploy version 2.0")
```

### Data Science Pipeline

```python theme={null}
edges = [
    # Data collection phase
    ("APICollector", "DataValidator"),
    ("DatabaseCollector", "DataValidator"),
    
    # Parallel processing
    ("DataValidator", ["Cleaner", "Transformer", "FeatureEngineer"]),
    
    # Convergence for modeling
    (["Cleaner", "Transformer", "FeatureEngineer"], "ModelTrainer"),
    
    # Evaluation and deployment
    ("ModelTrainer", "ModelEvaluator"),
    ("ModelEvaluator", "Deployer"),
]

ml_pipeline = GraphWorkflow.from_spec(
    agents=[api_collector, db_collector, validator, cleaner, transformer, 
            feature_eng, trainer, evaluator, deployer],
    edges=edges,
)

result = ml_pipeline.run("Train and deploy customer churn model")
```

### Content Creation Workflow

```python theme={null}
# Diamond pattern: Research -> (Write, Design) -> Review
edges = [
    ("Researcher", ["Writer", "Designer"]),
    (["Writer", "Designer"], "Reviewer"),
    ("Reviewer", "Publisher"),
]

content_workflow = GraphWorkflow.from_spec(
    agents=[researcher, writer, designer, reviewer, publisher],
    edges=edges,
)

result = content_workflow.run("Create marketing campaign for product launch")
```

## Entry and Exit Points

### Auto-Detection

```python theme={null}
workflow = GraphWorkflow()
workflow.add_node(agent1)
workflow.add_node(agent2)
workflow.add_node(agent3)
workflow.add_edge("agent1", "agent2")
workflow.add_edge("agent2", "agent3")

workflow.compile()

# Automatically detects:
# - entry_points: ["agent1"] (no incoming edges)
# - end_points: ["agent3"] (no outgoing edges)
```

### Manual Setting

```python theme={null}
workflow.set_entry_points(["StartAgent"])
workflow.set_end_points(["FinalAgent"])
```

## Backend Selection

### NetworkX (Default)

```python theme={null}
workflow = GraphWorkflow(backend="networkx")
```

Benefits:

* Pure Python
* Rich ecosystem
* Extensive algorithms
* Easy debugging

### Rustworkx (Performance)

```python theme={null}
workflow = GraphWorkflow(backend="rustworkx")
```

Benefits:

* Rust-based performance
* Faster graph operations
* Lower memory usage
* Better for large graphs

Requires: `pip install rustworkx`

## Topological Execution

The workflow executes in topological layers:

```python theme={null}
# Graph:
# A -> B, C
# B -> D
# C -> D

# Execution layers:
# Layer 0: [A]
# Layer 1: [B, C]  (parallel)
# Layer 2: [D]
```

Parallel execution within layers using ThreadPoolExecutor.

## Graph Validation

### Cycle Detection

```python theme={null}
try:
    workflow.add_edge("A", "B")
    workflow.add_edge("B", "C")
    workflow.add_edge("C", "A")  # Creates cycle
    workflow.compile()
except ValueError as e:
    print("Cycle detected!")
```

### Dependency Validation

```python theme={null}
# Validates:
# - All referenced nodes exist
# - No self-loops
# - DAG structure (no cycles)
# - Entry points exist
# - End points exist
```

## Performance Optimization

### Compilation Caching

```python theme={null}
# First run: compiles
result1 = workflow.run("Task 1")

# Subsequent runs: uses cache
result2 = workflow.run("Task 2")
result3 = workflow.run("Task 3")

# Manual recompilation (if graph changed)
workflow.compile()
```

### Concurrent Node Addition

```python theme={null}
# Add many nodes efficiently
workflow.add_nodes(
    agents=agent_list,
    batch_size=10,  # Process in batches
)
```

## Best Practices

<Note>
  **Graph Design**: Keep graphs acyclic - use multiple workflows for iterative processes
</Note>

1. **Clear Dependencies**: Only add edges for true dependencies
2. **Maximize Parallelism**: Let independent nodes run concurrently
3. **Compilation**: Always compile before running
4. **Entry/Exit Points**: Let auto-detection work unless specific control needed
5. **Backend Choice**: Use Rustworkx for large graphs (>100 nodes)

<Warning>
  Graph compilation is cached - manually recompile if graph structure changes after initial compilation
</Warning>

## Error Handling

```python theme={null}
try:
    workflow.add_edge("NonExistentAgent", "TargetAgent")
except ValueError as e:
    print(f"Invalid edge: {e}")
    # Source or target node doesn't exist

try:
    workflow.compile()
except Exception as e:
    print(f"Compilation failed: {e}")
    # Cycle detected or invalid graph structure
```

## Visualization

With Graphviz installed:

```python theme={null}
# Requires: pip install graphviz

# Visualize workflow structure
# (Implementation depends on GraphWorkflow visualization methods)
```

## Related Architectures

* [Agent Rearrange](/architectures/agent-rearrange) - Simpler flow patterns
* [Sequential Workflow](/architectures/sequential-workflow) - Linear execution
* [Concurrent Workflow](/architectures/concurrent-workflow) - Pure parallel
* [Hierarchical Swarm](/architectures/hierarchical-swarm) - Director coordination
