AsyncWorkflow Documentation¶
The AsyncWorkflow
class represents an asynchronous workflow that executes tasks concurrently using multiple agents. It allows for efficient task management, leveraging Python's asyncio
for concurrent execution.
Key Features¶
- Concurrent Task Execution: Distribute tasks across multiple agents asynchronously.
- Configurable Workers: Limit the number of concurrent workers (agents) for better resource management.
- Autosave Results: Optionally save the task execution results automatically.
- Verbose Logging: Enable detailed logging to monitor task execution.
- Error Handling: Gracefully handles exceptions raised by agents during task execution.
Attributes¶
Attribute | Type | Description |
---|---|---|
name |
str |
The name of the workflow. |
agents |
List[Agent] |
A list of agents participating in the workflow. |
max_workers |
int |
The maximum number of concurrent workers (default: 5). |
dashboard |
bool |
Whether to display a dashboard (currently not implemented). |
autosave |
bool |
Whether to autosave task results (default: False ). |
verbose |
bool |
Whether to enable detailed logging (default: False ). |
task_pool |
List |
A pool of tasks to be executed. |
results |
List |
A list to store results of executed tasks. |
loop |
asyncio.EventLoop |
The event loop for asynchronous execution. |
Description:
Initializes the AsyncWorkflow
with specified agents, configuration, and options.
Parameters:
- name
(str
): Name of the workflow. Default: "AsyncWorkflow".
- agents
(List[Agent]
): A list of agents. Default: None
.
- max_workers
(int
): The maximum number of workers. Default: 5
.
- dashboard
(bool
): Enable dashboard visualization (placeholder for future implementation).
- autosave
(bool
): Enable autosave of task results. Default: False
.
- verbose
(bool
): Enable detailed logging. Default: False
.
- **kwargs
: Additional parameters for BaseWorkflow
.
_execute_agent_task
¶
Description:
Executes a single task asynchronously using a given agent.
Parameters:
- agent
(Agent
): The agent responsible for executing the task.
- task
(str
): The task to be executed.
Returns:
- Any
: The result of the task execution or an error message in case of an exception.
Example:
run
¶
Description:
Executes the specified task concurrently across all agents.
Parameters:
- task
(str
): The task to be executed by all agents.
Returns:
- List[Any]
: A list of results or error messages returned by the agents.
Raises:
- ValueError
: If no agents are provided in the workflow.
Example:
import asyncio
agents = [Agent("Agent1"), Agent("Agent2")]
workflow = AsyncWorkflow(agents=agents, verbose=True)
results = asyncio.run(workflow.run("Process Data"))
print(results)
Production-Grade Financial Example: Multiple Agents¶
Example: Stock Analysis and Investment Strategy¶
import asyncio
from typing import List
from swarm_models import OpenAIChat
from swarms.structs.async_workflow import (
SpeakerConfig,
SpeakerRole,
create_default_workflow,
run_workflow_with_retry,
)
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.structs.agent import Agent
async def create_specialized_agents() -> List[Agent]:
"""Create a set of specialized agents for financial analysis"""
# Base model configuration
model = OpenAIChat(model_name="gpt-4o")
# Financial Analysis Agent
financial_agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT
+ "Output the <DONE> token when you're done creating a portfolio of etfs, index, funds, and more for AI",
max_loops=1,
llm=model,
dynamic_temperature_enabled=True,
user_name="Kye",
retry_attempts=3,
context_length=8192,
return_step_meta=False,
output_type="str",
auto_generate_prompt=False,
max_tokens=4000,
stopping_token="<DONE>",
saved_state_path="financial_agent.json",
interactive=False,
)
# Risk Assessment Agent
risk_agent = Agent(
agent_name="Risk-Assessment-Agent",
agent_description="Investment risk analysis specialist",
system_prompt="Analyze investment risks and provide risk scores. Output <DONE> when analysis is complete.",
max_loops=1,
llm=model,
dynamic_temperature_enabled=True,
user_name="Kye",
retry_attempts=3,
context_length=8192,
output_type="str",
max_tokens=4000,
stopping_token="<DONE>",
saved_state_path="risk_agent.json",
interactive=False,
)
# Market Research Agent
research_agent = Agent(
agent_name="Market-Research-Agent",
agent_description="AI and tech market research specialist",
system_prompt="Research AI market trends and growth opportunities. Output <DONE> when research is complete.",
max_loops=1,
llm=model,
dynamic_temperature_enabled=True,
user_name="Kye",
retry_attempts=3,
context_length=8192,
output_type="str",
max_tokens=4000,
stopping_token="<DONE>",
saved_state_path="research_agent.json",
interactive=False,
)
return [financial_agent, risk_agent, research_agent]
async def main():
# Create specialized agents
agents = await create_specialized_agents()
# Create workflow with group chat enabled
workflow = create_default_workflow(
agents=agents,
name="AI-Investment-Analysis-Workflow",
enable_group_chat=True,
)
# Configure speaker roles
workflow.speaker_system.add_speaker(
SpeakerConfig(
role=SpeakerRole.COORDINATOR,
agent=agents[0], # Financial agent as coordinator
priority=1,
concurrent=False,
required=True,
)
)
workflow.speaker_system.add_speaker(
SpeakerConfig(
role=SpeakerRole.CRITIC,
agent=agents[1], # Risk agent as critic
priority=2,
concurrent=True,
)
)
workflow.speaker_system.add_speaker(
SpeakerConfig(
role=SpeakerRole.EXECUTOR,
agent=agents[2], # Research agent as executor
priority=2,
concurrent=True,
)
)
# Investment analysis task
investment_task = """
Create a comprehensive investment analysis for a $40k portfolio focused on AI growth opportunities:
1. Identify high-growth AI ETFs and index funds
2. Analyze risks and potential returns
3. Create a diversified portfolio allocation
4. Provide market trend analysis
Present the results in a structured markdown format.
"""
try:
# Run workflow with retry
result = await run_workflow_with_retry(
workflow=workflow, task=investment_task, max_retries=3
)
print("\nWorkflow Results:")
print("================")
# Process and display agent outputs
for output in result.agent_outputs:
print(f"\nAgent: {output.agent_name}")
print("-" * (len(output.agent_name) + 8))
print(output.output)
# Display group chat history if enabled
if workflow.enable_group_chat:
print("\nGroup Chat Discussion:")
print("=====================")
for msg in workflow.speaker_system.message_history:
print(f"\n{msg.role} ({msg.agent_name}):")
print(msg.content)
# Save detailed results
if result.metadata.get("shared_memory_keys"):
print("\nShared Insights:")
print("===============")
for key in result.metadata["shared_memory_keys"]:
value = workflow.shared_memory.get(key)
if value:
print(f"\n{key}:")
print(value)
except Exception as e:
print(f"Workflow failed: {str(e)}")
finally:
await workflow.cleanup()
if __name__ == "__main__":
# Run the example
asyncio.run(main())