Documentation Index
Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The ModelRouter is an intelligent routing system that automatically selects and executes AI models based on task requirements. It leverages a function-calling architecture to analyze tasks and recommend the optimal model and provider combination for each specific use case.
Key Features
- Dynamic model selection based on task complexity and requirements
- Multi-provider support (OpenAI, Anthropic, Google, etc.)
- Concurrent and asynchronous execution capabilities
- Batch processing with memory
- Automatic error handling and retries
- Provider-aware routing
- Cost optimization
Installation
- Install the latest version of swarms:
- Set up your API keys in your
.env file:
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
GOOGLE_API_KEY=your_google_api_key
# Add more API keys as needed following litellm format
Attributes
system_prompt
str
default:"model_router_system_prompt"
Custom prompt for guiding model selection behavior.
Maximum token limit for model outputs.
Control parameter for response randomness (0.0-1.0).
Maximum concurrent workers. Use "auto" for CPU count.
API key for model access.
Maximum number of refinement iterations.
Methods
run()
Executes a single task through the model router with memory and refinement capabilities.
def run(self, task: str) -> str
Parameters:
task (str): The task to be executed
Returns: str - The result of task execution
batch_run()
Executes multiple tasks sequentially with result aggregation.
def batch_run(self, tasks: list) -> list
Parameters:
tasks (list): List of task strings to be executed
Returns: list - List of results, one for each task
concurrent_run()
Parallel execution of multiple tasks using thread pooling.
def concurrent_run(self, tasks: list) -> list
Parameters:
tasks (list): List of task strings to be executed
Returns: list - List of results from parallel execution
async_run()
Asynchronous task execution with coroutine support.
async def async_run(self, task: str) -> asyncio.Task
Parameters:
task (str): The task to be executed
Returns: asyncio.Task - An asyncio Task object
Usage Examples
Basic Usage
from swarms import ModelRouter
router = ModelRouter()
# Simple text analysis
result = router.run("Analyze the sentiment and key themes in this customer feedback")
# Complex reasoning task
complex_result = router.run("""
Evaluate the following business proposal:
- Initial investment: $500,000
- Projected ROI: 25% annually
- Market size: $2B
- Competition: 3 major players
Provide detailed analysis and recommendations.
""")
Batch Processing
from swarms import ModelRouter
router = ModelRouter()
# Multiple analysis tasks
tasks = [
"Analyze Q1 financial performance",
"Predict Q2 market trends",
"Evaluate competitor strategies",
"Generate growth recommendations"
]
results = router.batch_run(tasks)
# Process results
for task, result in zip(tasks, results):
print(f"Task: {task}\nResult: {result}\n")
Concurrent Execution
from swarms import ModelRouter
router = ModelRouter()
# Define multiple concurrent tasks
analysis_tasks = [
"Perform technical analysis of AAPL stock",
"Analyze market sentiment from social media",
"Generate trading signals",
"Calculate risk metrics"
]
# Execute tasks concurrently
results = router.concurrent_run(analysis_tasks)
# Process results with error handling
for task, result in zip(analysis_tasks, results):
try:
processed_result = process_analysis(result)
save_to_database(processed_result)
except Exception as e:
log_error(f"Error processing {task}: {str(e)}")
Asynchronous Execution
import asyncio
from swarms import ModelRouter
async def process_data_stream():
router = ModelRouter()
tasks = []
async for data in data_stream:
task = await router.async_run(f"Process data: {data}")
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
# Usage in async context
async def main():
results = await process_data_stream()
Financial Analysis System
from swarms import ModelRouter
from typing import Dict, List
import pandas as pd
class FinancialAnalysisSystem:
def __init__(self):
self.router = ModelRouter(
temperature=0.3, # Lower temperature for more deterministic outputs
max_tokens=8000, # Higher token limit for detailed analysis
max_loops=2 # Allow for refinement iteration
)
def analyze_company_financials(self, financial_data: Dict) -> Dict:
analysis_task = f"""
Perform comprehensive financial analysis:
Financial Metrics:
- Revenue: ${financial_data['revenue']}M
- EBITDA: ${financial_data['ebitda']}M
- Debt/Equity: {financial_data['debt_equity']}
- Working Capital: ${financial_data['working_capital']}M
Required Analysis:
1. Profitability assessment
2. Liquidity analysis
3. Growth projections
4. Risk evaluation
5. Investment recommendations
Provide detailed insights and actionable recommendations.
"""
result = self.router.run(analysis_task)
return self._parse_analysis_result(result)
def _parse_analysis_result(self, result: str) -> Dict:
# Implementation of result parsing
pass
# Usage
analyzer = FinancialAnalysisSystem()
company_data = {
'revenue': 150,
'ebitda': 45,
'debt_equity': 0.8,
'working_capital': 25
}
analysis = analyzer.analyze_company_financials(company_data)
Healthcare Data Processing Pipeline
import asyncio
from swarms import ModelRouter
from typing import List, Dict
class MedicalDataProcessor:
def __init__(self):
self.router = ModelRouter(
max_workers="auto", # Automatic worker scaling
temperature=0.2, # Conservative temperature for medical analysis
system_prompt="""You are a specialized medical data analyzer focused on:
1. Clinical terminology interpretation
2. Patient data analysis
3. Treatment recommendation review
4. Medical research synthesis"""
)
async def process_patient_records(self, records: List[Dict]) -> List[Dict]:
analysis_tasks = []
for record in records:
task = f"""
Analyze patient record:
- Age: {record['age']}
- Symptoms: {', '.join(record['symptoms'])}
- Vital Signs: {record['vitals']}
- Medications: {', '.join(record['medications'])}
- Lab Results: {record['lab_results']}
Provide:
1. Symptom analysis
2. Medication interaction check
3. Lab results interpretation
4. Treatment recommendations
"""
analysis_tasks.append(task)
results = await asyncio.gather(*[
self.router.async_run(task) for task in analysis_tasks
])
return [self._parse_medical_analysis(r) for r in results]
def _parse_medical_analysis(self, analysis: str) -> Dict:
# Implementation of medical analysis parsing
pass
# Usage
async def main():
processor = MedicalDataProcessor()
patient_records = [
{
'age': 45,
'symptoms': ['fever', 'cough', 'fatigue'],
'vitals': {'bp': '120/80', 'temp': '38.5C'},
'medications': ['lisinopril', 'metformin'],
'lab_results': 'WBC: 11,000, CRP: 2.5'
}
# More records...
]
analyses = await processor.process_patient_records(patient_records)
NLP Processing Pipeline
from swarms import ModelRouter
from typing import List, Dict
class NLPPipeline:
def __init__(self):
self.router = ModelRouter(
temperature=0.4,
max_loops=2
)
def process_documents(self, documents: List[str]) -> List[Dict]:
tasks = [self._create_nlp_task(doc) for doc in documents]
results = self.router.concurrent_run(tasks)
return [self._parse_nlp_result(r) for r in results]
def _create_nlp_task(self, document: str) -> str:
return f"""
Perform comprehensive NLP analysis:
Text: {document}
Required Analysis:
1. Entity recognition
2. Sentiment analysis
3. Topic classification
4. Key phrase extraction
5. Intent detection
Provide structured analysis with confidence scores.
"""
def _parse_nlp_result(self, result: str) -> Dict:
# Implementation of NLP result parsing
pass
# Usage
pipeline = NLPPipeline()
documents = [
"We're extremely satisfied with the new product features!",
"The customer service response time needs improvement.",
"Looking to upgrade our subscription plan next month."
]
analyses = pipeline.process_documents(documents)
Available Models and Use Cases
| Model | Provider | Optimal Use Cases | Characteristics |
|---|
| gpt-4-turbo | OpenAI | Complex reasoning, Code generation, Creative writing | High accuracy, Latest knowledge cutoff |
| claude-3-opus | Anthropic | Research analysis, Technical documentation, Long-form content | Strong reasoning, Detailed outputs |
| gemini-pro | Google | Multimodal tasks, Code generation, Technical analysis | Fast inference, Strong coding abilities |
| mistral-large | Mistral | General tasks, Content generation, Classification | Open source, Good price/performance |
| deepseek-reasoner | DeepSeek | Mathematical analysis, Logic problems, Scientific computing | Specialized reasoning capabilities |
Provider Capabilities
| Provider | Strengths | Best For | Integration Notes |
|---|
| OpenAI | Consistent performance, Strong reasoning | Production systems, Complex tasks | Requires API key setup |
| Anthropic | Safety features, Detailed analysis | Research, Technical writing | Claude-specific formatting |
| Google | Technical tasks, Multimodal support | Code generation, Analysis | Vertex AI integration available |
| Groq | High-speed inference | Real-time applications | Optimized for specific models |
| DeepSeek | Specialized reasoning | Scientific computing | Custom API integration |
| Mistral | Open source flexibility | General applications | Self-hosted options available |
-
Token Management
- Set appropriate
max_tokens based on task complexity
- Monitor token usage for cost optimization
- Use streaming for long outputs
-
Concurrency Settings
- Adjust
max_workers based on system resources
- Use
"auto" workers for optimal CPU utilization
- Monitor memory usage with large batch sizes
-
Temperature Tuning
- Lower (0.1-0.3) for factual/analytical tasks
- Higher (0.7-0.9) for creative tasks
- Mid-range (0.4-0.6) for balanced outputs
-
System Prompts
- Customize for specific domains
- Include relevant context
- Define clear output formats
Dependencies
asyncio: Asynchronous I/O support
concurrent.futures: Thread pool execution
pydantic: Data validation
litellm: LLM interface standardization
Source Code
View the source code on GitHub