Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.swarms.world/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The multi_agent_exec module provides a comprehensive set of utility functions for running multiple agents using various execution strategies. It includes synchronous and asynchronous execution methods, concurrent batch processing, and utility functions for information retrieval.

Installation

pip install -U swarms

Function Overview

FunctionCategoryDescription
run_single_agentSingle AgentRuns a single agent synchronously
run_agent_asyncSingle AgentRuns a single agent asynchronously using asyncio
run_agents_concurrently_asyncConcurrentRuns multiple agents concurrently using asyncio
run_agents_concurrentlyConcurrentOptimized concurrent runner with image support and flexible output formats
run_agents_concurrently_multiprocessConcurrentManages agents concurrently in batches with optimized performance
batched_grid_agent_executionBatched & GridRuns multiple agents with different tasks concurrently
run_agents_with_different_tasksBatched & GridRuns agents with different tasks concurrently in batches
get_swarms_infoUtilityFetches and formats information about available swarms
get_agents_infoUtilityFetches and formats information about available agents

Single Agent Functions

run_single_agent()

Runs a single agent synchronously.
def run_single_agent(agent: AgentType, task: str, *args, **kwargs) -> Any
Parameters:
  • agent (AgentType): Agent instance to run
  • task (str): Task string to execute
  • *args (Any): Additional positional arguments
  • **kwargs (Any): Additional keyword arguments
Returns: Agent execution result
from swarms import Agent
from swarms.structs.multi_agent_exec import run_single_agent

agent = Agent(
    agent_name="Financial-Analyst",
    system_prompt="You are a financial analysis expert",
    model_name="gpt-4o",
    max_loops=1
)

result = run_single_agent(agent, "Analyze the current stock market trends")
print(result)

run_agent_async()

Runs a single agent asynchronously using asyncio.
async def run_agent_async(agent: AgentType, task: str) -> Any
Parameters:
  • agent (AgentType): Agent instance to run
  • task (str): Task string to execute
Returns: Agent execution result
import asyncio
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agent_async

async def main():
    agent = Agent(
        agent_name="Researcher",
        system_prompt="You are a research assistant",
        model_name="gpt-4o",
        max_loops=1
    )

    result = await run_agent_async(agent, "Research AI advancements in 2024")
    print(result)

asyncio.run(main())

Concurrent Execution Functions

run_agents_concurrently_async()

Runs multiple agents concurrently using asyncio.
async def run_agents_concurrently_async(
    agents: List[AgentType],
    task: str
) -> List[Any]
Parameters:
  • agents (List[AgentType]): List of Agent instances to run concurrently
  • task (str): Task string to execute by all agents
Returns: List of outputs from each agent
import asyncio
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently_async

async def main():
    agents = [
        Agent(
            agent_name=f"Analyst-{i}",
            system_prompt="You are a market analyst",
            model_name="gpt-4o",
            max_loops=1
        )
        for i in range(3)
    ]

    task = "Analyze the impact of AI on job markets"
    results = await run_agents_concurrently_async(agents, task)

    for i, result in enumerate(results):
        print(f"Agent {i+1} result: {result}")

asyncio.run(main())

run_agents_concurrently()

Optimized concurrent agent runner using ThreadPoolExecutor with image support and flexible output formats.
def run_agents_concurrently(
    agents: List[AgentType],
    task: str,
    img: Optional[str] = None,
    max_workers: Optional[int] = None,
    return_agent_output_dict: bool = False,
) -> Union[List[Any], Dict[str, Any]]
Parameters:
  • agents (List[AgentType]): List of Agent instances to run concurrently
  • task (str): Task string to execute
  • img (Optional[str]): Optional image data to pass to agent run() if supported
  • max_workers (Optional[int]): Maximum number of threads in the executor. Defaults to 95% of CPU cores
  • return_agent_output_dict (bool): If True, returns a dict mapping agent names to outputs
Returns:
  • If return_agent_output_dict=False: List of outputs from each agent in completion order (exceptions included if agents fail)
  • If return_agent_output_dict=True: Dictionary mapping agent names to outputs, preserving agent input order
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently

# Create multiple agents
agents = [
    Agent(
        agent_name="Tech-Analyst",
        system_prompt="You are a technology analyst",
        model_name="gpt-4o",
        max_loops=1
    ),
    Agent(
        agent_name="Finance-Analyst",
        system_prompt="You are a financial analyst",
        model_name="gpt-4o",
        max_loops=1
    ),
    Agent(
        agent_name="Market-Strategist",
        system_prompt="You are a market strategist",
        model_name="gpt-4o",
        max_loops=1
    )
]

task = "Analyze the future of electric vehicles in 2025"

# Basic concurrent execution
results = run_agents_concurrently(agents, task, max_workers=4)
for i, result in enumerate(results):
    print(f"Agent {i+1} ({agents[i].agent_name}): {result}")

# Return results as dictionary with agent names as keys
results_dict = run_agents_concurrently(
    agents, task, return_agent_output_dict=True
)
for agent_name, result in results_dict.items():
    print(f"{agent_name}: {result}")

run_agents_concurrently_multiprocess()

Manages and runs multiple agents concurrently in batches with optimized performance.
def run_agents_concurrently_multiprocess(
    agents: List[Agent],
    task: str,
    batch_size: int = os.cpu_count()
) -> List[Any]
Parameters:
  • agents (List[Agent]): List of Agent instances to run concurrently
  • task (str): Task string to execute by all agents
  • batch_size (int): Number of agents to run in parallel in each batch. Defaults to CPU count
Returns: List of outputs from each agent
import os
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently_multiprocess

agents = [
    Agent(
        agent_name=f"Research-Agent-{i}",
        system_prompt="You are a research specialist",
        model_name="gpt-4o",
        max_loops=1
    )
    for i in range(5)
]

task = "Research the benefits of renewable energy"
batch_size = os.cpu_count()
results = run_agents_concurrently_multiprocess(agents, task, batch_size)

print(f"Completed {len(results)} agent executions")

Batched and Grid Execution

batched_grid_agent_execution()

Runs multiple agents with different tasks concurrently using batched grid execution.
def batched_grid_agent_execution(
    agents: List[AgentType],
    tasks: List[str],
    max_workers: int = None,
) -> List[Any]
Parameters:
  • agents (List[AgentType]): List of agent instances
  • tasks (List[str]): List of tasks, one for each agent
  • max_workers (int): Maximum number of threads to use. Defaults to 90% of CPU cores
Returns: List of results from each agent Raises: ValueError if the number of agents doesn’t match the number of tasks
from swarms import Agent
from swarms.structs.multi_agent_exec import batched_grid_agent_execution

agents = [
    Agent(
        agent_name="Data-Scientist",
        system_prompt="You are a data science expert",
        model_name="gpt-4o",
        max_loops=1
    ),
    Agent(
        agent_name="ML-Engineer",
        system_prompt="You are a machine learning engineer",
        model_name="gpt-4o",
        max_loops=1
    ),
    Agent(
        agent_name="AI-Researcher",
        system_prompt="You are an AI researcher",
        model_name="gpt-4o",
        max_loops=1
    )
]

tasks = [
    "Analyze machine learning algorithms performance",
    "Design a neural network architecture",
    "Research latest AI breakthroughs"
]

results = batched_grid_agent_execution(agents, tasks, max_workers=3)

for i, result in enumerate(results):
    print(f"Task {i+1}: {tasks[i]}")
    print(f"Result: {result}\n")

run_agents_with_different_tasks()

Runs multiple agents with different tasks concurrently, processing them in batches.
def run_agents_with_different_tasks(
    agent_task_pairs: List[tuple[AgentType, str]],
    batch_size: int = 10,
    max_workers: int = None,
) -> List[Any]
Parameters:
  • agent_task_pairs (List[tuple[AgentType, str]]): List of (agent, task) tuples
  • batch_size (int): Number of agents to run in parallel in each batch. Default: 10
  • max_workers (int): Maximum number of threads
Returns: List of outputs from each agent, in the same order as input pairs
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_with_different_tasks

# Create agents
agents = [
    Agent(
        agent_name="Content-Writer",
        system_prompt="You are a content writer",
        model_name="gpt-4o",
        max_loops=1
    ),
    Agent(
        agent_name="Editor",
        system_prompt="You are an editor",
        model_name="gpt-4o",
        max_loops=1
    ),
    Agent(
        agent_name="SEO-Specialist",
        system_prompt="You are an SEO specialist",
        model_name="gpt-4o",
        max_loops=1
    )
]

# Create agent-task pairs
agent_task_pairs = [
    (agents[0], "Write a blog post about sustainable living"),
    (agents[1], "Edit and improve this article draft"),
    (agents[2], "Optimize this content for SEO")
]

results = run_agents_with_different_tasks(agent_task_pairs, batch_size=2)

for i, result in enumerate(results):
    agent, task = agent_task_pairs[i]
    print(f"{agent.agent_name} - {task}: {result}")

Utility Functions

get_swarms_info()

Fetches and formats information about all available swarms in the system.
def get_swarms_info(swarms: List[Callable]) -> str
Parameters:
  • swarms (List[Callable]): List of swarm objects to get information about
Returns: Formatted string containing names and descriptions of all swarms
from swarms.structs.multi_agent_exec import get_swarms_info

swarms = [
    # Your swarm objects here
]

info = get_swarms_info(swarms)
print(info)
# Output:
# Available Swarms:
#
# [Swarm 1]
# Name: ResearchSwarm
# Description: A swarm for research tasks
# Length of Agents: 3
# Swarm Type: hierarchical

get_agents_info()

Fetches and formats information about all available agents in the system.
def get_agents_info(
    agents: List[Union[Agent, Callable]],
    team_name: str = None
) -> str
Parameters:
  • agents (List[Union[Agent, Callable]]): List of agent objects to get information about
  • team_name (str, optional): Optional team name to display
Returns: Formatted string containing names and descriptions of all agents
from swarms import Agent
from swarms.structs.multi_agent_exec import get_agents_info

agents = [
    Agent(
        agent_name="Research-Agent",
        system_prompt="You are a research assistant",
        model_name="gpt-4o",
        max_loops=2,
        role="Researcher"
    ),
    Agent(
        agent_name="Analysis-Agent",
        system_prompt="You are a data analyst",
        model_name="gpt-4o",
        max_loops=1,
        role="Analyst"
    )
]

info = get_agents_info(agents, team_name="Data Team")
print(info)

Advanced Multi-Agent Workflow Example

from swarms import Agent
from swarms.structs.multi_agent_exec import (
    run_agents_concurrently,
    run_agents_with_different_tasks,
    batched_grid_agent_execution,
    get_agents_info
)

# Create specialized agents
agents = [
    Agent(
        agent_name="Market-Researcher",
        system_prompt="You are a market research expert specializing in consumer behavior",
        model_name="gpt-4o",
        max_loops=1,
        role="Researcher"
    ),
    Agent(
        agent_name="Data-Analyst",
        system_prompt="You are a data analyst expert in statistical analysis",
        model_name="gpt-4o",
        max_loops=1,
        role="Analyst"
    ),
    Agent(
        agent_name="Strategy-Consultant",
        system_prompt="You are a strategy consultant specializing in business development",
        model_name="gpt-4o",
        max_loops=1,
        role="Consultant"
    ),
    Agent(
        agent_name="Financial-Advisor",
        system_prompt="You are a financial advisor specializing in investment strategies",
        model_name="gpt-4o",
        max_loops=1,
        role="Advisor"
    )
]

# Display agent information
print("=== Agent Information ===")
print(get_agents_info(agents, "Business Intelligence Team"))

# Same task for all agents (concurrent execution)
task = "Analyze the impact of remote work trends on commercial real estate market"
results = run_agents_concurrently(agents, task, max_workers=4)

for i, result in enumerate(results):
    print(f"\n{agents[i].agent_name} Analysis:")
    print(f"Result: {result}")

# Dictionary output format
results_dict = run_agents_concurrently(
    agents, task, return_agent_output_dict=True, max_workers=4
)
for agent_name, result in results_dict.items():
    print(f"\n{agent_name} Analysis:")
    print(f"Result: {result}")

# Different tasks for different agents
agent_task_pairs = [
    (agents[0], "Research consumer preferences for electric vehicles"),
    (agents[1], "Analyze sales data for EV market penetration"),
    (agents[2], "Develop marketing strategy for EV adoption"),
    (agents[3], "Assess financial viability of EV charging infrastructure")
]

results = run_agents_with_different_tasks(agent_task_pairs, batch_size=2)

# Grid execution with matched agents and tasks
grid_agents = agents[:3]
grid_tasks = [
    "Forecast market trends for renewable energy",
    "Evaluate risk factors in green technology investments",
    "Compare traditional vs sustainable investment portfolios"
]

grid_results = batched_grid_agent_execution(grid_agents, grid_tasks, max_workers=3)

Error Handling and Best Practices

from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)

# Create agents with error handling
agents = [
    Agent(
        agent_name=f"Agent-{i}",
        system_prompt="You are a helpful assistant",
        model_name="gpt-4o",
        max_loops=1
    )
    for i in range(5)
]

task = "Perform a complex analysis task"

try:
    results = run_agents_concurrently(agents, task, max_workers=4)

    # Handle results (some may be exceptions)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Agent {i+1} failed with error: {result}")
        else:
            print(f"Agent {i+1} succeeded: {result}")

except Exception as e:
    print(f"Execution failed: {e}")

Performance Considerations

TechniqueBest Use Case / Description
ThreadPoolExecutorBest for CPU-bound tasks with moderate I/O, supports image processing and flexible output formats
Batch ProcessingPrevents system overload with large numbers of agents, maintains order with grid execution
Resource MonitoringAdjust worker counts based on system capabilities (defaults to 95% of CPU cores)
Async/AwaitUse async functions for better concurrency control and platform optimizations
Image SupportPass image data to agents that support multimodal processing for enhanced capabilities
Dictionary OutputUse return_agent_output_dict=True for structured results with agent name mapping
Error HandlingAll functions include comprehensive exception handling with graceful fallbacks

Best Practices

  1. Always handle exceptions in results, as some agents may fail
  2. Use appropriate max_workers based on system resources
  3. Monitor memory usage for large agent counts
  4. Consider batch processing for very large numbers of agents
  5. Use return_agent_output_dict=True for structured, named results
  6. Pass image data to agents that support multimodal processing

Source Code

View the source code on GitHub