> ## Documentation Index
> Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
> Use this file to discover all available pages before exploring further.

# GraphWorkflow

> A powerful DAG-based workflow system for complex multi-agent orchestration with parallel execution

## Overview

The `GraphWorkflow` class represents a workflow graph where each node is an agent. It provides sophisticated capabilities for building, executing, and visualizing Directed Acyclic Graph (DAG) workflows with automatic parallel execution, topological sorting, and compilation optimization.

## Key Features

* **DAG-Based Workflows**: Build complex directed acyclic graphs of agent execution
* **Automatic Parallelization**: Automatically execute independent agents in parallel
* **Topological Execution**: Agents execute in topologically sorted layers
* **Compilation Optimization**: Pre-compute execution plans for faster multi-loop execution
* **Multiple Graph Backends**: Support for NetworkX and Rustworkx backends
* **Visualization**: Generate visual representations of workflows using Graphviz
* **Fan-Out/Fan-In Patterns**: Easy creation of parallel processing patterns
* **JSON Serialization**: Save and load workflows to/from JSON
* **Async Support**: Asynchronous execution for non-blocking operations
* **Cycle Detection**: Automatic detection and reporting of cycles

## Installation

```bash theme={null}
pip install -U swarms

# For visualization support
pip install graphviz

# For Rustworkx backend (optional, faster for large graphs)
pip install rustworkx
```

## Class Definition

```python theme={null}
class GraphWorkflow:
    def __init__(
        self,
        id: Optional[str] = str(uuid.uuid4()),
        name: Optional[str] = "Graph-Workflow-01",
        description: Optional[str] = "A customizable workflow system for orchestrating and coordinating multiple agents.",
        nodes: Optional[Dict[str, Node]] = None,
        edges: Optional[List[Edge]] = None,
        entry_points: Optional[List[str]] = None,
        end_points: Optional[List[str]] = None,
        max_loops: int = 1,
        task: Optional[str] = None,
        auto_compile: bool = True,
        verbose: bool = False,
        backend: str = "networkx",
    )
```

## Parameters

<ParamField path="id" type="Optional[str]" default="uuid.uuid4()">
  Unique identifier for the workflow. Auto-generated UUID if not provided.
</ParamField>

<ParamField path="name" type="Optional[str]" default="Graph-Workflow-01">
  Human-readable name for the workflow
</ParamField>

<ParamField path="description" type="Optional[str]" default="A customizable workflow system...">
  Description of the workflow's purpose
</ParamField>

<ParamField path="nodes" type="Optional[Dict[str, Node]]" default="None">
  Dictionary of nodes in the graph, where the key is the node ID and the value is the Node object
</ParamField>

<ParamField path="edges" type="Optional[List[Edge]]" default="None">
  List of edges in the graph, where each edge is represented by an Edge object
</ParamField>

<ParamField path="entry_points" type="Optional[List[str]]" default="None">
  List of node IDs that serve as entry points to the graph. Auto-detected if not provided.
</ParamField>

<ParamField path="end_points" type="Optional[List[str]]" default="None">
  List of node IDs that serve as end points of the graph. Auto-detected if not provided.
</ParamField>

<ParamField path="max_loops" type="int" default="1">
  Maximum number of times to execute the workflow
</ParamField>

<ParamField path="task" type="Optional[str]" default="None">
  The task to be executed by the workflow
</ParamField>

<ParamField path="auto_compile" type="bool" default="True">
  Whether to automatically compile the graph for optimization
</ParamField>

<ParamField path="verbose" type="bool" default="False">
  Whether to enable verbose logging
</ParamField>

<ParamField path="backend" type="str" default="networkx">
  Graph backend to use. Options: "networkx", "rustworkx" (Rustworkx is faster for large graphs)
</ParamField>

## Methods

### Graph Construction

#### `add_node(agent, **kwargs)`

Add an agent node to the workflow graph.

<ParamField path="agent" type="Agent" required>
  The agent to add as a node
</ParamField>

<ParamField path="**kwargs" type="Any">
  Additional keyword arguments for the node
</ParamField>

#### `add_nodes(agents, batch_size=10, **kwargs)`

Add multiple agents to the workflow graph concurrently in batches.

<ParamField path="agents" type="List[Agent]" required>
  List of agents to add
</ParamField>

<ParamField path="batch_size" type="int" default="10">
  Number of agents to add concurrently in a batch
</ParamField>

#### `add_edge(edge_or_source, target=None, **kwargs)`

Add an edge by Edge object or by passing node objects/ids.

<ParamField path="edge_or_source" type="Union[Edge, Node, Agent, str]" required>
  Either an Edge object or the source node/id
</ParamField>

<ParamField path="target" type="Optional[Union[Node, Agent, str]]">
  Target node/id (required if edge\_or\_source is not an Edge)
</ParamField>

#### `add_edges_from_source(source, targets, **kwargs)`

Add multiple edges from a single source to multiple targets (fan-out pattern).

<ParamField path="source" type="Union[Node, Agent, str]" required>
  Source node/id that will send output to multiple targets
</ParamField>

<ParamField path="targets" type="List[Union[Node, Agent, str]]" required>
  List of target node/ids that will receive the source output in parallel
</ParamField>

<ResponseField name="edges" type="List[Edge]">
  List of created Edge objects
</ResponseField>

#### `add_edges_to_target(sources, target, **kwargs)`

Add multiple edges from multiple sources to a single target (fan-in pattern).

<ParamField path="sources" type="List[Union[Node, Agent, str]]" required>
  List of source node/ids that will send output to the target
</ParamField>

<ParamField path="target" type="Union[Node, Agent, str]" required>
  Target node/id that will receive all source outputs
</ParamField>

<ResponseField name="edges" type="List[Edge]">
  List of created Edge objects
</ResponseField>

#### `add_parallel_chain(sources, targets, **kwargs)`

Create a parallel processing chain (full mesh connection).

<ParamField path="sources" type="List[Union[Node, Agent, str]]" required>
  List of source node/ids
</ParamField>

<ParamField path="targets" type="List[Union[Node, Agent, str]]" required>
  List of target node/ids
</ParamField>

<ResponseField name="edges" type="List[Edge]">
  List of created Edge objects
</ResponseField>

### Execution

#### `run(task=None, img=None, *args, **kwargs)`

Run the workflow graph with optimized parallel agent execution.

<ParamField path="task" type="Optional[str]">
  Task to execute. Uses self.task if not provided.
</ParamField>

<ParamField path="img" type="Optional[str]">
  Optional image path for multimodal tasks
</ParamField>

<ResponseField name="results" type="Dict[str, Any]">
  Execution results from all nodes
</ResponseField>

#### `arun(task=None, *args, **kwargs)`

Async version of run for better performance with I/O bound operations.

<ParamField path="task" type="Optional[str]">
  Task to execute. Uses self.task if not provided.
</ParamField>

<ResponseField name="results" type="Dict[str, Any]">
  Execution results from all nodes
</ResponseField>

### Compilation and Optimization

#### `compile()`

Pre-compute expensive operations for faster execution. Results are cached.

#### `get_compilation_status()`

Get detailed compilation status information.

<ResponseField name="status" type="Dict[str, Any]">
  Compilation status including cache state, timestamps, and performance metrics
</ResponseField>

### Entry/Exit Points

#### `set_entry_points(entry_points)`

Set the entry points for the workflow.

<ParamField path="entry_points" type="List[str]" required>
  List of node IDs to serve as entry points
</ParamField>

#### `set_end_points(end_points)`

Set the end points for the workflow.

<ParamField path="end_points" type="List[str]" required>
  List of node IDs to serve as end points
</ParamField>

#### `auto_set_entry_points()`

Automatically set entry points to nodes with no incoming edges.

#### `auto_set_end_points()`

Automatically set end points to nodes with no outgoing edges.

### Visualization

#### `visualize(format="png", view=True, engine="dot", show_summary=False)`

Visualize the workflow graph using Graphviz.

<ParamField path="format" type="str" default="png">
  Output format: 'png', 'svg', 'pdf', 'dot'
</ParamField>

<ParamField path="view" type="bool" default="True">
  Whether to open the visualization after creation
</ParamField>

<ParamField path="engine" type="str" default="dot">
  Graphviz layout engine: 'dot', 'neato', 'fdp', 'sfdp', 'twopi', 'circo'
</ParamField>

<ParamField path="show_summary" type="bool" default="False">
  Whether to print parallel processing summary
</ParamField>

<ResponseField name="filepath" type="str">
  Path to the generated visualization file
</ResponseField>

#### `visualize_simple()`

Simple text-based visualization for environments without Graphviz.

<ResponseField name="visualization" type="str">
  Text representation of the workflow
</ResponseField>

### Serialization

#### `to_json(fast=True, include_conversation=False, include_runtime_state=False)`

Serialize the workflow to JSON.

<ParamField path="fast" type="bool" default="True">
  Whether to use fast JSON serialization
</ParamField>

<ParamField path="include_conversation" type="bool" default="False">
  Whether to include conversation history
</ParamField>

<ParamField path="include_runtime_state" type="bool" default="False">
  Whether to include runtime state like compilation info
</ParamField>

<ResponseField name="json_string" type="str">
  JSON representation of the workflow
</ResponseField>

#### `from_json(json_str, restore_runtime_state=False)` (classmethod)

Deserialize a workflow from JSON.

<ParamField path="json_str" type="str" required>
  JSON string representation of the workflow
</ParamField>

<ParamField path="restore_runtime_state" type="bool" default="False">
  Whether to restore runtime state
</ParamField>

<ResponseField name="workflow" type="GraphWorkflow">
  A new GraphWorkflow instance
</ResponseField>

#### `save_to_file(filepath, include_conversation=False, include_runtime_state=False, overwrite=False)`

Save the workflow to a JSON file.

<ParamField path="filepath" type="str" required>
  Path to save the JSON file
</ParamField>

<ParamField path="overwrite" type="bool" default="False">
  Whether to overwrite existing files
</ParamField>

<ResponseField name="filepath" type="str">
  Path to the saved file
</ResponseField>

#### `load_from_file(filepath, restore_runtime_state=False)` (classmethod)

Load a workflow from a JSON file.

<ParamField path="filepath" type="str" required>
  Path to the JSON file
</ParamField>

<ResponseField name="workflow" type="GraphWorkflow">
  Loaded workflow instance
</ResponseField>

### Validation

#### `validate(auto_fix=False)`

Validate the workflow structure.

<ParamField path="auto_fix" type="bool" default="False">
  Whether to automatically fix simple issues
</ParamField>

<ResponseField name="validation_result" type="Dict[str, Any]">
  Dictionary containing validation results, including validity, warnings and errors
</ResponseField>

### Factory Method

#### `from_spec(agents, edges, entry_points=None, end_points=None, task=None, **kwargs)` (classmethod)

Construct a workflow from a list of agents and connections.

<ParamField path="agents" type="List[Union[Agent, Node]]" required>
  List of agents or Node objects
</ParamField>

<ParamField path="edges" type="List[Union[Edge, Tuple[Any, Any]]]" required>
  List of edges or edge tuples. Supports fan-out, fan-in, and parallel chain patterns.
</ParamField>

<ParamField path="entry_points" type="Optional[List[str]]">
  List of entry point node IDs
</ParamField>

<ParamField path="end_points" type="Optional[List[str]]">
  List of end point node IDs
</ParamField>

<ParamField path="task" type="Optional[str]">
  Task to be executed by the workflow
</ParamField>

<ResponseField name="workflow" type="GraphWorkflow">
  A new GraphWorkflow instance
</ResponseField>

## Usage Examples

### Basic Sequential Graph

```python theme={null}
from swarms import Agent, GraphWorkflow
from swarms.models import OpenAIChat

# Initialize LLM
llm = OpenAIChat(model_name="claude-sonnet-4-6")

# Create agents
researcher = Agent(
    agent_name="Researcher",
    llm=llm,
    system_prompt="Research and gather information"
)

analyzer = Agent(
    agent_name="Analyzer",
    llm=llm,
    system_prompt="Analyze the research findings"
)

writer = Agent(
    agent_name="Writer",
    llm=llm,
    system_prompt="Write based on analysis"
)

# Create workflow
workflow = GraphWorkflow(
    name="Research-Analyze-Write",
    description="Sequential research workflow"
)

# Add nodes
workflow.add_node(researcher)
workflow.add_node(analyzer)
workflow.add_node(writer)

# Add edges
workflow.add_edge("Researcher", "Analyzer")
workflow.add_edge("Analyzer", "Writer")

# Compile and run
workflow.compile()
results = workflow.run("Research AI trends in healthcare")
```

### Fan-Out Pattern (Parallel Specialists)

```python theme={null}
# Create multiple specialist agents
technical = Agent(agent_name="Technical", llm=llm, system_prompt="Technical analysis")
business = Agent(agent_name="Business", llm=llm, system_prompt="Business analysis")
user_exp = Agent(agent_name="UX", llm=llm, system_prompt="User experience analysis")
synthesizer = Agent(agent_name="Synthesizer", llm=llm, system_prompt="Synthesize all analyses")

workflow = GraphWorkflow(name="Multi-Perspective")

# Add all nodes
for agent in [researcher, technical, business, user_exp, synthesizer]:
    workflow.add_node(agent)

# Fan-out from researcher to specialists
workflow.add_edges_from_source(
    "Researcher",
    ["Technical", "Business", "UX"]
)

# Fan-in to synthesizer
workflow.add_edges_to_target(
    ["Technical", "Business", "UX"],
    "Synthesizer"
)

results = workflow.run("Analyze AI chatbot implementation")
```

### Complex DAG Workflow

```python theme={null}
# Create a complex multi-stage workflow
data_collector = Agent(agent_name="DataCollector", llm=llm)
preprocessor = Agent(agent_name="Preprocessor", llm=llm)
model_a = Agent(agent_name="ModelA", llm=llm)
model_b = Agent(agent_name="ModelB", llm=llm)
model_c = Agent(agent_name="ModelC", llm=llm)
ensemble = Agent(agent_name="Ensemble", llm=llm)
validator = Agent(agent_name="Validator", llm=llm)

workflow = GraphWorkflow(name="ML-Pipeline")

# Add all nodes
agents = [data_collector, preprocessor, model_a, model_b, model_c, ensemble, validator]
workflow.add_nodes(agents, batch_size=5)

# Build DAG
workflow.add_edge("DataCollector", "Preprocessor")
workflow.add_edges_from_source("Preprocessor", ["ModelA", "ModelB", "ModelC"])
workflow.add_edges_to_target(["ModelA", "ModelB", "ModelC"], "Ensemble")
workflow.add_edge("Ensemble", "Validator")

# Visualize before running
workflow.visualize(show_summary=True)

# Execute
results = workflow.run("Predict customer churn")
```

### Using from\_spec for Quick Construction

```python theme={null}
# Quick workflow construction using from_spec
agents = [researcher, analyzer, writer]

edges = [
    ("Researcher", "Analyzer"),
    ("Analyzer", "Writer")
]

workflow = GraphWorkflow.from_spec(
    agents=agents,
    edges=edges,
    name="Quick-Workflow",
    task="Analyze quantum computing trends"
)

results = workflow.run()  # Uses task from initialization
```

### Advanced Edge Patterns in from\_spec

```python theme={null}
agents = [collector, analyst1, analyst2, analyst3, synthesizer]

edges = [
    # Simple edge
    ("Collector", "Analyst1"),
    
    # Fan-out
    ("Collector", ["Analyst2", "Analyst3"]),
    
    # Fan-in
    (["Analyst1", "Analyst2", "Analyst3"], "Synthesizer"),
]

workflow = GraphWorkflow.from_spec(
    agents=agents,
    edges=edges,
    verbose=True
)
```

### Async Execution

```python theme={null}
import asyncio

async def run_workflow():
    workflow = GraphWorkflow(name="Async-Workflow")
    workflow.add_nodes([agent1, agent2, agent3])
    workflow.add_edge("agent1", "agent2")
    workflow.add_edge("agent2", "agent3")
    
    results = await workflow.arun("Process this task asynchronously")
    return results

results = asyncio.run(run_workflow())
```

### Save and Load Workflows

```python theme={null}
# Save workflow
workflow.save_to_file(
    "my_workflow.json",
    include_conversation=True,
    include_runtime_state=True
)

# Load workflow
loaded_workflow = GraphWorkflow.load_from_file(
    "my_workflow.json",
    restore_runtime_state=True
)

# Continue execution
results = loaded_workflow.run("New task")
```

### Using Rustworkx Backend

```python theme={null}
# Use Rustworkx for better performance on large graphs
workflow = GraphWorkflow(
    name="Large-Workflow",
    backend="rustworkx",  # Faster for large graphs
    verbose=True
)

# Add many nodes
workflow.add_nodes(list_of_100_agents, batch_size=20)

# Build complex graph
# ...

results = workflow.run("Complex task")
```

### Workflow Validation

```python theme={null}
# Validate workflow before execution
validation = workflow.validate(auto_fix=True)

if validation["is_valid"]:
    print("Workflow is valid!")
    results = workflow.run(task)
else:
    print("Errors:", validation["errors"])
    print("Warnings:", validation["warnings"])
```

### Compilation for Multi-Loop Execution

```python theme={null}
workflow = GraphWorkflow(
    name="Multi-Loop-Workflow",
    max_loops=10,  # Will run 10 times
    auto_compile=True  # Compilation cached for all loops
)

workflow.add_nodes([agent1, agent2, agent3])
workflow.add_edges_from_source("agent1", ["agent2", "agent3"])

# Compilation is cached and reused across all loops
results = workflow.run("Task to repeat 10 times")

# Check compilation status
status = workflow.get_compilation_status()
print(f"Compiled: {status['is_compiled']}")
print(f"Layers: {status['cached_layers_count']}")
```

### Custom Visualization

```python theme={null}
# Generate different visualization formats
workflow.visualize(
    format="svg",
    engine="fdp",  # Force-directed layout
    view=False,
    show_summary=True
)

# Simple text visualization (no Graphviz needed)
text_viz = workflow.visualize_simple()
print(text_viz)
```

## Node and Edge Classes

### Node

```python theme={null}
from swarms.structs.graph_workflow import Node, NodeType

# Create node from agent
node = Node.from_agent(agent)

# Create node manually
node = Node(
    id="my-node",
    type=NodeType.AGENT,
    agent=agent,
    metadata={"custom_key": "custom_value"}
)
```

### Edge

```python theme={null}
from swarms.structs.graph_workflow import Edge

# Create edge from nodes
edge = Edge.from_nodes(source_agent, target_agent)

# Create edge from IDs
edge = Edge(source="agent1", target="agent2")

# With metadata
edge = Edge(
    source="agent1",
    target="agent2",
    metadata={"weight": 1.0}
)
```

## Best Practices

1. **Compilation**: Always compile workflows before execution for optimal performance
2. **Visualization**: Use `visualize()` to understand your workflow structure
3. **Validation**: Validate workflows before production deployment
4. **Entry/Exit Points**: Let the system auto-detect entry/exit points or set them explicitly
5. **Backend Selection**: Use Rustworkx for large graphs (>100 nodes)
6. **Error Handling**: Wrap execution in try-except blocks
7. **Batching**: Use `add_nodes()` with appropriate batch sizes for many agents
8. **Caching**: Enable `auto_compile` for multi-loop workflows
9. **Save Progress**: Save complex workflows to JSON for reuse

## Performance Characteristics

* **Parallel Execution**: Independent agents in the same layer run concurrently
* **CPU Utilization**: Uses \~95% of available CPU cores
* **Compilation**: Pre-computes execution plan, cached for multi-loop runs
* **Memory**: Each agent maintains its own context
* **Graph Backend**: Rustworkx is faster for large graphs (>100 nodes)

## Common Patterns

### Research Pipeline

```python theme={null}
edges = [
    ("DataCollector", "Researcher"),
    ("Researcher", ["Analyst1", "Analyst2", "Analyst3"]),
    (["Analyst1", "Analyst2", "Analyst3"], "Synthesizer")
]
```

### Ensemble Decision Making

```python theme={null}
edges = [
    ("Coordinator", ["Expert1", "Expert2", "Expert3", "Expert4"]),
    (["Expert1", "Expert2", "Expert3", "Expert4"], "Aggregator"),
    ("Aggregator", "DecisionMaker")
]
```

### Multi-Stage Review

```python theme={null}
edges = [
    ("Creator", "TechnicalReviewer"),
    ("TechnicalReviewer", "BusinessReviewer"),
    ("BusinessReviewer", "LegalReviewer"),
    ("LegalReviewer", "FinalApprover")
]
```

## Related Classes

* [SequentialWorkflow](/api/sequential-workflow): For simple sequential execution
* [ConcurrentWorkflow](/api/concurrent-workflow): For parallel execution on same task
* [AgentRearrange](/api/agent-rearrange): For flexible flow-based orchestration
* [Agent](/api/agent): The base agent class used in workflows
