> ## Documentation Index
> Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
> Use this file to discover all available pages before exploring further.

# ConcurrentWorkflow

> A concurrent workflow system for running multiple agents simultaneously on the same task

## Overview

The `ConcurrentWorkflow` class provides a framework for executing multiple agents concurrently on the same task, with optional dashboard monitoring, streaming callbacks, and various output formatting options. It uses ThreadPoolExecutor to manage concurrent execution and provides real-time status tracking for each agent.

## Key Features

* **Concurrent Execution**: Run multiple agents simultaneously on the same task
* **Real-time Dashboard**: Monitor agent status and outputs in real-time
* **Streaming Callbacks**: Get real-time updates as agents generate outputs
* **Flexible Output Formatting**: Multiple output format options
* **Auto-save Support**: Automatically save conversation history
* **Error Handling**: Graceful error handling with status tracking
* **Batch Processing**: Process multiple tasks sequentially with concurrent agents

## Installation

```bash theme={null}
pip install -U swarms
```

## Class Definition

```python theme={null}
class ConcurrentWorkflow:
    def __init__(
        self,
        id: str = None,
        name: str = "ConcurrentWorkflow",
        description: str = "Execution of multiple agents concurrently",
        agents: List[Union[Agent, Callable]] = None,
        auto_save: bool = True,
        output_type: str = "dict-all-except-first",
        max_loops: int = 1,
        auto_generate_prompts: bool = False,
        show_dashboard: bool = False,
        autosave: bool = True,
        verbose: bool = False,
    )
```

## Parameters

<ParamField path="id" type="str" default="auto-generated">
  Unique identifier for the workflow instance. Auto-generated if not provided.
</ParamField>

<ParamField path="name" type="str" default="ConcurrentWorkflow">
  Human-readable name for the workflow
</ParamField>

<ParamField path="description" type="str" default="Execution of multiple agents concurrently">
  Description of the workflow's purpose
</ParamField>

<ParamField path="agents" type="List[Union[Agent, Callable]]" required>
  List of agents to execute concurrently. Must not be None or empty.
</ParamField>

<ParamField path="auto_save" type="bool" default="True">
  Whether to automatically save workflow metadata
</ParamField>

<ParamField path="output_type" type="str" default="dict-all-except-first">
  Format for output formatting. Options include "dict-all-except-first", "dict", "list", "str"
</ParamField>

<ParamField path="max_loops" type="int" default="1">
  Maximum number of execution loops (currently unused in concurrent execution)
</ParamField>

<ParamField path="auto_generate_prompts" type="bool" default="False">
  Whether to enable automatic prompt engineering for all agents
</ParamField>

<ParamField path="show_dashboard" type="bool" default="False">
  Whether to display real-time dashboard during execution
</ParamField>

<ParamField path="autosave" type="bool" default="True">
  Whether to automatically save conversation history to workspace
</ParamField>

<ParamField path="verbose" type="bool" default="False">
  Whether to enable verbose logging
</ParamField>

## Methods

### `run(task, img=None, imgs=None, streaming_callback=None)`

Execute all agents concurrently on the given task.

<ParamField path="task" type="str" required>
  The task to be executed by all agents
</ParamField>

<ParamField path="img" type="Optional[str]" default="None">
  Single image path for agents that support image input
</ParamField>

<ParamField path="imgs" type="Optional[List[str]]" default="None">
  List of image paths for agents that support multiple images
</ParamField>

<ParamField path="streaming_callback" type="Optional[Callable[[str, str, bool], None]]" default="None">
  Callback function for streaming updates. Called with (agent\_name, chunk, is\_final) parameters.
</ParamField>

<ResponseField name="result" type="Union[Dict, List, str]">
  Formatted conversation history based on output\_type
</ResponseField>

### `batch_run(tasks, imgs=None, streaming_callback=None)`

Execute workflow on multiple tasks sequentially.

<ParamField path="tasks" type="List[str]" required>
  List of tasks to be executed
</ParamField>

<ParamField path="imgs" type="Optional[List[str]]" default="None">
  List of image paths corresponding to each task
</ParamField>

<ParamField path="streaming_callback" type="Optional[Callable[[str, str, bool], None]]" default="None">
  Callback function for streaming updates
</ParamField>

<ResponseField name="results" type="List[Union[Dict, List, str]]">
  List of results for each task
</ResponseField>

### `run_with_dashboard(task, img=None, imgs=None, streaming_callback=None)`

Execute agents with real-time dashboard monitoring.

<ParamField path="task" type="str" required>
  The task to be executed by all agents
</ParamField>

<ParamField path="img" type="Optional[str]" default="None">
  Single image path for agents that support image input
</ParamField>

<ParamField path="imgs" type="Optional[List[str]]" default="None">
  List of image paths for agents that support multiple images
</ParamField>

<ParamField path="streaming_callback" type="Optional[Callable[[str, str, bool], None]]" default="None">
  Callback function for streaming updates
</ParamField>

<ResponseField name="result" type="Union[Dict, List, str]">
  Formatted conversation history based on output\_type
</ResponseField>

### `cleanup()`

Clean up resources and connections. Called automatically after each run.

### `reliability_check()`

Validate workflow configuration.

**Raises:**

* `ValueError`: If no agents are provided or agents list is empty

### `activate_auto_prompt_engineering()`

Enable automatic prompt engineering for all agents in the workflow.

### `display_agent_dashboard(title="ConcurrentWorkflow Dashboard", is_final=False)`

Display real-time dashboard showing agent status and outputs.

<ParamField path="title" type="str" default="ConcurrentWorkflow Dashboard">
  Title to display for the dashboard
</ParamField>

<ParamField path="is_final" type="bool" default="False">
  Whether this is the final dashboard display
</ParamField>

## Attributes

| Attribute              | Type                           | Description                                         |
| ---------------------- | ------------------------------ | --------------------------------------------------- |
| `id`                   | str                            | Unique identifier for the workflow instance         |
| `name`                 | str                            | Human-readable name for the workflow                |
| `description`          | str                            | Description of the workflow's purpose               |
| `agents`               | List\[Union\[Agent, Callable]] | List of agents to execute concurrently              |
| `agent_statuses`       | dict                           | Dictionary tracking status and output of each agent |
| `conversation`         | Conversation                   | Conversation object for storing agent interactions  |
| `metadata_output_path` | str                            | Path for saving workflow metadata                   |

## Usage Examples

### Basic Concurrent Workflow

```python theme={null}
from swarms import Agent, ConcurrentWorkflow
from swarms.models import OpenAIChat

# Initialize the language model
llm = OpenAIChat(
    model_name="claude-sonnet-4-6",
    temperature=0.5,
)

# Create specialized agents with different perspectives
technical_analyst = Agent(
    agent_name="Technical-Analyst",
    llm=llm,
    max_loops=1,
    system_prompt="You are a technical analyst. Analyze from a technical perspective."
)

business_analyst = Agent(
    agent_name="Business-Analyst",
    llm=llm,
    max_loops=1,
    system_prompt="You are a business analyst. Analyze from a business perspective."
)

user_researcher = Agent(
    agent_name="User-Researcher",
    llm=llm,
    max_loops=1,
    system_prompt="You are a user researcher. Analyze from a user experience perspective."
)

# Create concurrent workflow
workflow = ConcurrentWorkflow(
    name="Multi-Perspective Analysis",
    description="Analyze from multiple perspectives simultaneously",
    agents=[technical_analyst, business_analyst, user_researcher],
    verbose=True
)

# Run all agents concurrently on the same task
result = workflow.run("Analyze the potential of implementing AI chatbots in customer service")
print(result)
```

### With Real-Time Dashboard

```python theme={null}
# Create workflow with dashboard enabled
workflow = ConcurrentWorkflow(
    name="Analysis with Dashboard",
    agents=[technical_analyst, business_analyst, user_researcher],
    show_dashboard=True,  # Enable real-time dashboard
    verbose=True
)

# Run and watch the dashboard update in real-time
result = workflow.run("Evaluate the impact of remote work on productivity")
```

### With Streaming Callbacks

```python theme={null}
def handle_stream(agent_name: str, chunk: str, is_final: bool):
    """Custom streaming callback to handle real-time updates"""
    if is_final:
        print(f"\n[{agent_name}] Completed!")
    else:
        print(f"[{agent_name}] {chunk}", end="", flush=True)

workflow = ConcurrentWorkflow(
    name="Streaming Analysis",
    agents=[technical_analyst, business_analyst],
)

# Get real-time streaming updates
result = workflow.run(
    "Analyze cybersecurity trends",
    streaming_callback=handle_stream
)
```

### Batch Processing

```python theme={null}
# Process multiple tasks sequentially, with agents running concurrently on each
tasks = [
    "Analyze the impact of AI on healthcare",
    "Evaluate renewable energy solutions",
    "Assess blockchain technology trends"
]

workflow = ConcurrentWorkflow(
    name="Batch Analysis",
    agents=[technical_analyst, business_analyst, user_researcher],
)

results = workflow.batch_run(tasks)

for i, (task, result) in enumerate(zip(tasks, results), 1):
    print(f"\nTask {i}: {task}")
    print(f"Result: {result}")
    print("-" * 80)
```

### With Image Input

```python theme={null}
from swarms import Agent
from swarms.models import GPT4VisionAPI

# Create vision-capable agents
vision_llm = GPT4VisionAPI()

image_analyst1 = Agent(
    agent_name="Image-Analyst-1",
    llm=vision_llm,
    system_prompt="Analyze images for technical details"
)

image_analyst2 = Agent(
    agent_name="Image-Analyst-2",
    llm=vision_llm,
    system_prompt="Analyze images for aesthetic quality"
)

workflow = ConcurrentWorkflow(
    agents=[image_analyst1, image_analyst2],
)

# Analyze image with multiple agents concurrently
result = workflow.run(
    "Analyze this product image",
    img="path/to/product.jpg"
)
```

### Custom Output Types

```python theme={null}
# Different output formats
workflow_dict = ConcurrentWorkflow(
    agents=[technical_analyst, business_analyst],
    output_type="dict"  # Returns dict with all agent outputs
)

workflow_list = ConcurrentWorkflow(
    agents=[technical_analyst, business_analyst],
    output_type="list"  # Returns list of outputs
)

workflow_dict_except_first = ConcurrentWorkflow(
    agents=[technical_analyst, business_analyst],
    output_type="dict-all-except-first"  # Skip first agent in output dict
)
```

### With Autosave

```python theme={null}
import os

# Set workspace directory
os.environ["WORKSPACE_DIR"] = "./analysis_workspace"

workflow = ConcurrentWorkflow(
    name="Saved-Analysis",
    agents=[technical_analyst, business_analyst],
    autosave=True,  # Save conversation history
    verbose=True
)

result = workflow.run("Analyze market trends")
# Conversation saved to ./analysis_workspace/swarms/ConcurrentWorkflow/
```

### Auto Prompt Engineering

```python theme={null}
# Enable automatic prompt engineering
workflow = ConcurrentWorkflow(
    agents=[technical_analyst, business_analyst],
    auto_generate_prompts=True  # Agents will auto-optimize their prompts
)

workflow.activate_auto_prompt_engineering()
result = workflow.run("Complex analysis task")
```

## Error Handling

```python theme={null}
try:
    workflow = ConcurrentWorkflow(
        agents=[technical_analyst, business_analyst],
        show_dashboard=True
    )
    result = workflow.run("Analyze this topic")
except ValueError as e:
    print(f"Configuration error: {e}")
except Exception as e:
    print(f"Execution error: {e}")
finally:
    # Cleanup is called automatically
    pass
```

## Dashboard Output Example

When `show_dashboard=True`, you'll see real-time updates like:

```
╔═══════════════════════════════════════════════════════════════╗
║           ConcurrentWorkflow Dashboard                         ║
╠═══════════════════════════════════════════════════════════════╣
║ Agent: Technical-Analyst                                       ║
║ Status: running                                                ║
║ Output: Analyzing technical aspects...                         ║
╠═══════════════════════════════════════════════════════════════╣
║ Agent: Business-Analyst                                        ║
║ Status: completed                                              ║
║ Output: The business impact is significant...                  ║
╠═══════════════════════════════════════════════════════════════╣
║ Agent: User-Researcher                                         ║
║ Status: pending                                                ║
║ Output:                                                        ║
╚═══════════════════════════════════════════════════════════════╝
```

## Best Practices

1. **Agent Diversity**: Use agents with different perspectives for richer analysis
2. **Dashboard for Monitoring**: Enable dashboard during development to monitor agent progress
3. **Streaming Callbacks**: Use streaming callbacks for real-time feedback in production
4. **Error Handling**: Always wrap concurrent execution in try-except blocks
5. **Resource Management**: Be mindful of API rate limits when running many agents
6. **Task Design**: Ensure the task benefits from multiple concurrent perspectives
7. **Autosave**: Enable autosave for important analyses

## Common Use Cases

* **Multi-Perspective Analysis**: Get technical, business, and user perspectives simultaneously
* **Consensus Building**: Run multiple agents and aggregate their outputs
* **Parallel Research**: Research different aspects of a topic concurrently
* **Voting Systems**: Multiple agents vote on decisions
* **Quality Assurance**: Multiple agents review the same content
* **Competitive Analysis**: Different agents analyze competing solutions

## Performance Considerations

* **CPU Utilization**: Uses \~95% of available CPU cores by default
* **Concurrent Execution**: All agents run truly concurrently using ThreadPoolExecutor
* **Memory Usage**: Each agent maintains its own context, consider memory for many agents
* **API Rate Limits**: Be aware of rate limits when using cloud-based LLMs

## Related Classes

* [SequentialWorkflow](/api/sequential-workflow): For sequential agent execution
* [AgentRearrange](/api/agent-rearrange): For complex orchestration patterns
* [GraphWorkflow](/api/graph-workflow): For DAG-based workflows
* [Agent](/api/agent): The base agent class used in workflows
