Overview
Themulti_agent_exec module provides a comprehensive set of utility functions for running multiple agents using various execution strategies. It includes synchronous and asynchronous execution methods, concurrent batch processing, and utility functions for information retrieval.
Installation
Function Overview
| Function | Category | Description |
|---|---|---|
run_single_agent | Single Agent | Runs a single agent synchronously |
run_agent_async | Single Agent | Runs a single agent asynchronously using asyncio |
run_agents_concurrently_async | Concurrent | Runs multiple agents concurrently using asyncio |
run_agents_concurrently | Concurrent | Optimized concurrent runner with image support and flexible output formats |
run_agents_concurrently_multiprocess | Concurrent | Manages agents concurrently in batches with optimized performance |
batched_grid_agent_execution | Batched & Grid | Runs multiple agents with different tasks concurrently |
run_agents_with_different_tasks | Batched & Grid | Runs agents with different tasks concurrently in batches |
get_swarms_info | Utility | Fetches and formats information about available swarms |
get_agents_info | Utility | Fetches and formats information about available agents |
Single Agent Functions
run_single_agent()
Runs a single agent synchronously.agent(AgentType): Agent instance to runtask(str): Task string to execute*args(Any): Additional positional arguments**kwargs(Any): Additional keyword arguments
run_agent_async()
Runs a single agent asynchronously using asyncio.agent(AgentType): Agent instance to runtask(str): Task string to execute
Concurrent Execution Functions
run_agents_concurrently_async()
Runs multiple agents concurrently using asyncio.agents(List[AgentType]): List of Agent instances to run concurrentlytask(str): Task string to execute by all agents
run_agents_concurrently()
Optimized concurrent agent runner using ThreadPoolExecutor with image support and flexible output formats.agents(List[AgentType]): List of Agent instances to run concurrentlytask(str): Task string to executeimg(Optional[str]): Optional image data to pass to agentrun()if supportedmax_workers(Optional[int]): Maximum number of threads in the executor. Defaults to 95% of CPU coresreturn_agent_output_dict(bool): If True, returns a dict mapping agent names to outputs
- If
return_agent_output_dict=False: List of outputs from each agent in completion order (exceptions included if agents fail) - If
return_agent_output_dict=True: Dictionary mapping agent names to outputs, preserving agent input order
run_agents_concurrently_multiprocess()
Manages and runs multiple agents concurrently in batches with optimized performance.agents(List[Agent]): List of Agent instances to run concurrentlytask(str): Task string to execute by all agentsbatch_size(int): Number of agents to run in parallel in each batch. Defaults to CPU count
Batched and Grid Execution
batched_grid_agent_execution()
Runs multiple agents with different tasks concurrently using batched grid execution.agents(List[AgentType]): List of agent instancestasks(List[str]): List of tasks, one for each agentmax_workers(int): Maximum number of threads to use. Defaults to 90% of CPU cores
ValueError if the number of agents doesn’t match the number of tasks
batch_agent_execution()
Runs a list of agents on a parallel list of tasks. Eachagents[i] runs tasks[i] — unlike batched_grid_agent_execution (which runs every agent on every task), pairings are 1:1.
agents(List[Agent | Callable]): Agents to run.tasks(List[str]): One task per agent. Must have the same length asagents.imgs(List[str]): One optional image input per agent. Must be the same length asagentswhen provided.max_workers(int): Thread pool size. Defaults to ~90% of CPU cores.
None in that slot.
Raises: BatchAgentExecutionError wrapping any internal failure. Mismatched len(agents) vs len(tasks) is wrapped from the inner ValueError.
run_agents_concurrently (every agent runs the same task) and batched_grid_agent_execution (every agent runs every task) to cover the three common shapes of “run a bunch of agents”.
run_agents_with_different_tasks()
Runs multiple agents with different tasks concurrently, processing them in batches.agent_task_pairs(List[tuple[AgentType, str]]): List of (agent, task) tuplesbatch_size(int): Number of agents to run in parallel in each batch. Default: 10max_workers(int): Maximum number of threads
Utility Functions
get_swarms_info()
Fetches and formats information about all available swarms in the system.swarms(List[Callable]): List of swarm objects to get information about
get_agents_info()
Fetches and formats information about all available agents in the system.agents(List[Union[Agent, Callable]]): List of agent objects to get information aboutteam_name(str, optional): Optional team name to display
Advanced Multi-Agent Workflow Example
Error Handling and Best Practices
Performance Considerations
| Technique | Best Use Case / Description |
|---|---|
| ThreadPoolExecutor | Best for CPU-bound tasks with moderate I/O, supports image processing and flexible output formats |
| Batch Processing | Prevents system overload with large numbers of agents, maintains order with grid execution |
| Resource Monitoring | Adjust worker counts based on system capabilities (defaults to 95% of CPU cores) |
| Async/Await | Use async functions for better concurrency control and platform optimizations |
| Image Support | Pass image data to agents that support multimodal processing for enhanced capabilities |
| Dictionary Output | Use return_agent_output_dict=True for structured results with agent name mapping |
| Error Handling | All functions include comprehensive exception handling with graceful fallbacks |
Best Practices
- Always handle exceptions in results, as some agents may fail
- Use appropriate
max_workersbased on system resources - Monitor memory usage for large agent counts
- Consider batch processing for very large numbers of agents
- Use
return_agent_output_dict=Truefor structured, named results - Pass image data to agents that support multimodal processing