Documentation Index
Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The multi_agent_exec module provides a comprehensive set of utility functions for running multiple agents using various execution strategies. It includes synchronous and asynchronous execution methods, concurrent batch processing, and utility functions for information retrieval.
Installation
Function Overview
| Function | Category | Description |
|---|
run_single_agent | Single Agent | Runs a single agent synchronously |
run_agent_async | Single Agent | Runs a single agent asynchronously using asyncio |
run_agents_concurrently_async | Concurrent | Runs multiple agents concurrently using asyncio |
run_agents_concurrently | Concurrent | Optimized concurrent runner with image support and flexible output formats |
run_agents_concurrently_multiprocess | Concurrent | Manages agents concurrently in batches with optimized performance |
batched_grid_agent_execution | Batched & Grid | Runs multiple agents with different tasks concurrently |
run_agents_with_different_tasks | Batched & Grid | Runs agents with different tasks concurrently in batches |
get_swarms_info | Utility | Fetches and formats information about available swarms |
get_agents_info | Utility | Fetches and formats information about available agents |
Single Agent Functions
run_single_agent()
Runs a single agent synchronously.
def run_single_agent(agent: AgentType, task: str, *args, **kwargs) -> Any
Parameters:
agent (AgentType): Agent instance to run
task (str): Task string to execute
*args (Any): Additional positional arguments
**kwargs (Any): Additional keyword arguments
Returns: Agent execution result
from swarms import Agent
from swarms.structs.multi_agent_exec import run_single_agent
agent = Agent(
agent_name="Financial-Analyst",
system_prompt="You are a financial analysis expert",
model_name="gpt-4o",
max_loops=1
)
result = run_single_agent(agent, "Analyze the current stock market trends")
print(result)
run_agent_async()
Runs a single agent asynchronously using asyncio.
async def run_agent_async(agent: AgentType, task: str) -> Any
Parameters:
agent (AgentType): Agent instance to run
task (str): Task string to execute
Returns: Agent execution result
import asyncio
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agent_async
async def main():
agent = Agent(
agent_name="Researcher",
system_prompt="You are a research assistant",
model_name="gpt-4o",
max_loops=1
)
result = await run_agent_async(agent, "Research AI advancements in 2024")
print(result)
asyncio.run(main())
Concurrent Execution Functions
run_agents_concurrently_async()
Runs multiple agents concurrently using asyncio.
async def run_agents_concurrently_async(
agents: List[AgentType],
task: str
) -> List[Any]
Parameters:
agents (List[AgentType]): List of Agent instances to run concurrently
task (str): Task string to execute by all agents
Returns: List of outputs from each agent
import asyncio
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently_async
async def main():
agents = [
Agent(
agent_name=f"Analyst-{i}",
system_prompt="You are a market analyst",
model_name="gpt-4o",
max_loops=1
)
for i in range(3)
]
task = "Analyze the impact of AI on job markets"
results = await run_agents_concurrently_async(agents, task)
for i, result in enumerate(results):
print(f"Agent {i+1} result: {result}")
asyncio.run(main())
run_agents_concurrently()
Optimized concurrent agent runner using ThreadPoolExecutor with image support and flexible output formats.
def run_agents_concurrently(
agents: List[AgentType],
task: str,
img: Optional[str] = None,
max_workers: Optional[int] = None,
return_agent_output_dict: bool = False,
) -> Union[List[Any], Dict[str, Any]]
Parameters:
agents (List[AgentType]): List of Agent instances to run concurrently
task (str): Task string to execute
img (Optional[str]): Optional image data to pass to agent run() if supported
max_workers (Optional[int]): Maximum number of threads in the executor. Defaults to 95% of CPU cores
return_agent_output_dict (bool): If True, returns a dict mapping agent names to outputs
Returns:
- If
return_agent_output_dict=False: List of outputs from each agent in completion order (exceptions included if agents fail)
- If
return_agent_output_dict=True: Dictionary mapping agent names to outputs, preserving agent input order
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently
# Create multiple agents
agents = [
Agent(
agent_name="Tech-Analyst",
system_prompt="You are a technology analyst",
model_name="gpt-4o",
max_loops=1
),
Agent(
agent_name="Finance-Analyst",
system_prompt="You are a financial analyst",
model_name="gpt-4o",
max_loops=1
),
Agent(
agent_name="Market-Strategist",
system_prompt="You are a market strategist",
model_name="gpt-4o",
max_loops=1
)
]
task = "Analyze the future of electric vehicles in 2025"
# Basic concurrent execution
results = run_agents_concurrently(agents, task, max_workers=4)
for i, result in enumerate(results):
print(f"Agent {i+1} ({agents[i].agent_name}): {result}")
# Return results as dictionary with agent names as keys
results_dict = run_agents_concurrently(
agents, task, return_agent_output_dict=True
)
for agent_name, result in results_dict.items():
print(f"{agent_name}: {result}")
run_agents_concurrently_multiprocess()
Manages and runs multiple agents concurrently in batches with optimized performance.
def run_agents_concurrently_multiprocess(
agents: List[Agent],
task: str,
batch_size: int = os.cpu_count()
) -> List[Any]
Parameters:
agents (List[Agent]): List of Agent instances to run concurrently
task (str): Task string to execute by all agents
batch_size (int): Number of agents to run in parallel in each batch. Defaults to CPU count
Returns: List of outputs from each agent
import os
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently_multiprocess
agents = [
Agent(
agent_name=f"Research-Agent-{i}",
system_prompt="You are a research specialist",
model_name="gpt-4o",
max_loops=1
)
for i in range(5)
]
task = "Research the benefits of renewable energy"
batch_size = os.cpu_count()
results = run_agents_concurrently_multiprocess(agents, task, batch_size)
print(f"Completed {len(results)} agent executions")
Batched and Grid Execution
batched_grid_agent_execution()
Runs multiple agents with different tasks concurrently using batched grid execution.
def batched_grid_agent_execution(
agents: List[AgentType],
tasks: List[str],
max_workers: int = None,
) -> List[Any]
Parameters:
agents (List[AgentType]): List of agent instances
tasks (List[str]): List of tasks, one for each agent
max_workers (int): Maximum number of threads to use. Defaults to 90% of CPU cores
Returns: List of results from each agent
Raises: ValueError if the number of agents doesn’t match the number of tasks
from swarms import Agent
from swarms.structs.multi_agent_exec import batched_grid_agent_execution
agents = [
Agent(
agent_name="Data-Scientist",
system_prompt="You are a data science expert",
model_name="gpt-4o",
max_loops=1
),
Agent(
agent_name="ML-Engineer",
system_prompt="You are a machine learning engineer",
model_name="gpt-4o",
max_loops=1
),
Agent(
agent_name="AI-Researcher",
system_prompt="You are an AI researcher",
model_name="gpt-4o",
max_loops=1
)
]
tasks = [
"Analyze machine learning algorithms performance",
"Design a neural network architecture",
"Research latest AI breakthroughs"
]
results = batched_grid_agent_execution(agents, tasks, max_workers=3)
for i, result in enumerate(results):
print(f"Task {i+1}: {tasks[i]}")
print(f"Result: {result}\n")
run_agents_with_different_tasks()
Runs multiple agents with different tasks concurrently, processing them in batches.
def run_agents_with_different_tasks(
agent_task_pairs: List[tuple[AgentType, str]],
batch_size: int = 10,
max_workers: int = None,
) -> List[Any]
Parameters:
agent_task_pairs (List[tuple[AgentType, str]]): List of (agent, task) tuples
batch_size (int): Number of agents to run in parallel in each batch. Default: 10
max_workers (int): Maximum number of threads
Returns: List of outputs from each agent, in the same order as input pairs
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_with_different_tasks
# Create agents
agents = [
Agent(
agent_name="Content-Writer",
system_prompt="You are a content writer",
model_name="gpt-4o",
max_loops=1
),
Agent(
agent_name="Editor",
system_prompt="You are an editor",
model_name="gpt-4o",
max_loops=1
),
Agent(
agent_name="SEO-Specialist",
system_prompt="You are an SEO specialist",
model_name="gpt-4o",
max_loops=1
)
]
# Create agent-task pairs
agent_task_pairs = [
(agents[0], "Write a blog post about sustainable living"),
(agents[1], "Edit and improve this article draft"),
(agents[2], "Optimize this content for SEO")
]
results = run_agents_with_different_tasks(agent_task_pairs, batch_size=2)
for i, result in enumerate(results):
agent, task = agent_task_pairs[i]
print(f"{agent.agent_name} - {task}: {result}")
Utility Functions
get_swarms_info()
Fetches and formats information about all available swarms in the system.
def get_swarms_info(swarms: List[Callable]) -> str
Parameters:
swarms (List[Callable]): List of swarm objects to get information about
Returns: Formatted string containing names and descriptions of all swarms
from swarms.structs.multi_agent_exec import get_swarms_info
swarms = [
# Your swarm objects here
]
info = get_swarms_info(swarms)
print(info)
# Output:
# Available Swarms:
#
# [Swarm 1]
# Name: ResearchSwarm
# Description: A swarm for research tasks
# Length of Agents: 3
# Swarm Type: hierarchical
get_agents_info()
Fetches and formats information about all available agents in the system.
def get_agents_info(
agents: List[Union[Agent, Callable]],
team_name: str = None
) -> str
Parameters:
agents (List[Union[Agent, Callable]]): List of agent objects to get information about
team_name (str, optional): Optional team name to display
Returns: Formatted string containing names and descriptions of all agents
from swarms import Agent
from swarms.structs.multi_agent_exec import get_agents_info
agents = [
Agent(
agent_name="Research-Agent",
system_prompt="You are a research assistant",
model_name="gpt-4o",
max_loops=2,
role="Researcher"
),
Agent(
agent_name="Analysis-Agent",
system_prompt="You are a data analyst",
model_name="gpt-4o",
max_loops=1,
role="Analyst"
)
]
info = get_agents_info(agents, team_name="Data Team")
print(info)
Advanced Multi-Agent Workflow Example
from swarms import Agent
from swarms.structs.multi_agent_exec import (
run_agents_concurrently,
run_agents_with_different_tasks,
batched_grid_agent_execution,
get_agents_info
)
# Create specialized agents
agents = [
Agent(
agent_name="Market-Researcher",
system_prompt="You are a market research expert specializing in consumer behavior",
model_name="gpt-4o",
max_loops=1,
role="Researcher"
),
Agent(
agent_name="Data-Analyst",
system_prompt="You are a data analyst expert in statistical analysis",
model_name="gpt-4o",
max_loops=1,
role="Analyst"
),
Agent(
agent_name="Strategy-Consultant",
system_prompt="You are a strategy consultant specializing in business development",
model_name="gpt-4o",
max_loops=1,
role="Consultant"
),
Agent(
agent_name="Financial-Advisor",
system_prompt="You are a financial advisor specializing in investment strategies",
model_name="gpt-4o",
max_loops=1,
role="Advisor"
)
]
# Display agent information
print("=== Agent Information ===")
print(get_agents_info(agents, "Business Intelligence Team"))
# Same task for all agents (concurrent execution)
task = "Analyze the impact of remote work trends on commercial real estate market"
results = run_agents_concurrently(agents, task, max_workers=4)
for i, result in enumerate(results):
print(f"\n{agents[i].agent_name} Analysis:")
print(f"Result: {result}")
# Dictionary output format
results_dict = run_agents_concurrently(
agents, task, return_agent_output_dict=True, max_workers=4
)
for agent_name, result in results_dict.items():
print(f"\n{agent_name} Analysis:")
print(f"Result: {result}")
# Different tasks for different agents
agent_task_pairs = [
(agents[0], "Research consumer preferences for electric vehicles"),
(agents[1], "Analyze sales data for EV market penetration"),
(agents[2], "Develop marketing strategy for EV adoption"),
(agents[3], "Assess financial viability of EV charging infrastructure")
]
results = run_agents_with_different_tasks(agent_task_pairs, batch_size=2)
# Grid execution with matched agents and tasks
grid_agents = agents[:3]
grid_tasks = [
"Forecast market trends for renewable energy",
"Evaluate risk factors in green technology investments",
"Compare traditional vs sustainable investment portfolios"
]
grid_results = batched_grid_agent_execution(grid_agents, grid_tasks, max_workers=3)
Error Handling and Best Practices
from swarms import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
# Create agents with error handling
agents = [
Agent(
agent_name=f"Agent-{i}",
system_prompt="You are a helpful assistant",
model_name="gpt-4o",
max_loops=1
)
for i in range(5)
]
task = "Perform a complex analysis task"
try:
results = run_agents_concurrently(agents, task, max_workers=4)
# Handle results (some may be exceptions)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Agent {i+1} failed with error: {result}")
else:
print(f"Agent {i+1} succeeded: {result}")
except Exception as e:
print(f"Execution failed: {e}")
| Technique | Best Use Case / Description |
|---|
| ThreadPoolExecutor | Best for CPU-bound tasks with moderate I/O, supports image processing and flexible output formats |
| Batch Processing | Prevents system overload with large numbers of agents, maintains order with grid execution |
| Resource Monitoring | Adjust worker counts based on system capabilities (defaults to 95% of CPU cores) |
| Async/Await | Use async functions for better concurrency control and platform optimizations |
| Image Support | Pass image data to agents that support multimodal processing for enhanced capabilities |
| Dictionary Output | Use return_agent_output_dict=True for structured results with agent name mapping |
| Error Handling | All functions include comprehensive exception handling with graceful fallbacks |
Best Practices
- Always handle exceptions in results, as some agents may fail
- Use appropriate
max_workers based on system resources
- Monitor memory usage for large agent counts
- Consider batch processing for very large numbers of agents
- Use
return_agent_output_dict=True for structured, named results
- Pass image data to agents that support multimodal processing
Source Code
View the source code on GitHub