ModelRouter Docs¶
The ModelRouter is an intelligent routing system that automatically selects and executes AI models based on task requirements. It leverages a function-calling architecture to analyze tasks and recommend the optimal model and provider combination for each specific use case.
Key Features¶
- Dynamic model selection based on task complexity and requirements
- Multi-provider support (OpenAI, Anthropic, Google, etc.)
- Concurrent and asynchronous execution capabilities
- Batch processing with memory
- Automatic error handling and retries
- Provider-aware routing
- Cost optimization
Constructor Arguments¶
Parameter | Type | Default | Description |
---|---|---|---|
system_prompt | str | model_router_system_prompt | Custom prompt for guiding model selection behavior |
max_tokens | int | 4000 | Maximum token limit for model outputs |
temperature | float | 0.5 | Control parameter for response randomness (0.0-1.0) |
max_workers | int/str | 10 | Maximum concurrent workers ("auto" for CPU count) |
api_key | str | None | API key for model access |
max_loops | int | 1 | Maximum number of refinement iterations |
*args | Any | None | Additional positional arguments |
**kwargs | Any | None | Additional keyword arguments |
Core Methods¶
run(task: str) -> str¶
Executes a single task through the model router with memory and refinement capabilities.
Installation¶
- Install the latest version of swarms using pip:
- Setup your API Keys in your .env file with the following:
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
GOOGLE_API_KEY=your_google_api_key
# Add more API keys as needed following litellm format
from swarms import ModelRouter
router = ModelRouter()
# Simple text analysis
result = router.run("Analyze the sentiment and key themes in this customer feedback")
# Complex reasoning task
complex_result = router.run("""
Evaluate the following business proposal:
- Initial investment: $500,000
- Projected ROI: 25% annually
- Market size: $2B
- Competition: 3 major players
Provide detailed analysis and recommendations.
""")
batch_run(tasks: list) -> list¶
Executes multiple tasks sequentially with result aggregation.
# Multiple analysis tasks
tasks = [
"Analyze Q1 financial performance",
"Predict Q2 market trends",
"Evaluate competitor strategies",
"Generate growth recommendations"
]
results = router.batch_run(tasks)
# Process results
for task, result in zip(tasks, results):
print(f"Task: {task}\nResult: {result}\n")
concurrent_run(tasks: list) -> list¶
Parallel execution of multiple tasks using thread pooling.
import asyncio
from typing import List
# Define multiple concurrent tasks
analysis_tasks = [
"Perform technical analysis of AAPL stock",
"Analyze market sentiment from social media",
"Generate trading signals",
"Calculate risk metrics"
]
# Execute tasks concurrently
results = router.concurrent_run(analysis_tasks)
# Process results with error handling
for task, result in zip(analysis_tasks, results):
try:
processed_result = process_analysis(result)
save_to_database(processed_result)
except Exception as e:
log_error(f"Error processing {task}: {str(e)}")
async_run(task: str) -> asyncio.Task¶
Asynchronous task execution with coroutine support.
async def process_data_stream():
tasks = []
async for data in data_stream:
task = await router.async_run(f"Process data: {data}")
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
# Usage in async context
async def main():
router = ModelRouter()
results = await process_data_stream()
Advanced Usage Examples¶
Financial Analysis System¶
from swarms import ModelRouter
from typing import Dict, List
import pandas as pd
class FinancialAnalysisSystem:
def __init__(self):
self.router = ModelRouter(
temperature=0.3, # Lower temperature for more deterministic outputs
max_tokens=8000, # Higher token limit for detailed analysis
max_loops=2 # Allow for refinement iteration
)
def analyze_company_financials(self, financial_data: Dict) -> Dict:
analysis_task = f"""
Perform comprehensive financial analysis:
Financial Metrics:
- Revenue: ${financial_data['revenue']}M
- EBITDA: ${financial_data['ebitda']}M
- Debt/Equity: {financial_data['debt_equity']}
- Working Capital: ${financial_data['working_capital']}M
Required Analysis:
1. Profitability assessment
2. Liquidity analysis
3. Growth projections
4. Risk evaluation
5. Investment recommendations
Provide detailed insights and actionable recommendations.
"""
result = self.router.run(analysis_task)
return self._parse_analysis_result(result)
def _parse_analysis_result(self, result: str) -> Dict:
# Implementation of result parsing
pass
# Usage
analyzer = FinancialAnalysisSystem()
company_data = {
'revenue': 150,
'ebitda': 45,
'debt_equity': 0.8,
'working_capital': 25
}
analysis = analyzer.analyze_company_financials(company_data)
Healthcare Data Processing Pipeline¶
from swarms import ModelRouter
import pandas as pd
from typing import List, Dict
class MedicalDataProcessor:
def __init__(self):
self.router = ModelRouter(
max_workers="auto", # Automatic worker scaling
temperature=0.2, # Conservative temperature for medical analysis
system_prompt="""You are a specialized medical data analyzer focused on:
1. Clinical terminology interpretation
2. Patient data analysis
3. Treatment recommendation review
4. Medical research synthesis"""
)
async def process_patient_records(self, records: List[Dict]) -> List[Dict]:
analysis_tasks = []
for record in records:
task = f"""
Analyze patient record:
- Age: {record['age']}
- Symptoms: {', '.join(record['symptoms'])}
- Vital Signs: {record['vitals']}
- Medications: {', '.join(record['medications'])}
- Lab Results: {record['lab_results']}
Provide:
1. Symptom analysis
2. Medication interaction check
3. Lab results interpretation
4. Treatment recommendations
"""
analysis_tasks.append(task)
results = await asyncio.gather(*[
self.router.async_run(task) for task in analysis_tasks
])
return [self._parse_medical_analysis(r) for r in results]
def _parse_medical_analysis(self, analysis: str) -> Dict:
# Implementation of medical analysis parsing
pass
# Usage
async def main():
processor = MedicalDataProcessor()
patient_records = [
{
'age': 45,
'symptoms': ['fever', 'cough', 'fatigue'],
'vitals': {'bp': '120/80', 'temp': '38.5C'},
'medications': ['lisinopril', 'metformin'],
'lab_results': 'WBC: 11,000, CRP: 2.5'
}
# More records...
]
analyses = await processor.process_patient_records(patient_records)
Natural Language Processing Pipeline¶
from swarms import ModelRouter
from typing import List, Dict
import asyncio
class NLPPipeline:
def __init__(self):
self.router = ModelRouter(
temperature=0.4,
max_loops=2
)
def process_documents(self, documents: List[str]) -> List[Dict]:
tasks = [self._create_nlp_task(doc) for doc in documents]
results = self.router.concurrent_run(tasks)
return [self._parse_nlp_result(r) for r in results]
def _create_nlp_task(self, document: str) -> str:
return f"""
Perform comprehensive NLP analysis:
Text: {document}
Required Analysis:
1. Entity recognition
2. Sentiment analysis
3. Topic classification
4. Key phrase extraction
5. Intent detection
Provide structured analysis with confidence scores.
"""
def _parse_nlp_result(self, result: str) -> Dict:
# Implementation of NLP result parsing
pass
# Usage
pipeline = NLPPipeline()
documents = [
"We're extremely satisfied with the new product features!",
"The customer service response time needs improvement.",
"Looking to upgrade our subscription plan next month."
]
analyses = pipeline.process_documents(documents)
Available Models and Use Cases¶
Model | Provider | Optimal Use Cases | Characteristics |
---|---|---|---|
gpt-4-turbo | OpenAI | Complex reasoning, Code generation, Creative writing | High accuracy, Latest knowledge cutoff |
claude-3-opus | Anthropic | Research analysis, Technical documentation, Long-form content | Strong reasoning, Detailed outputs |
gemini-pro | Multimodal tasks, Code generation, Technical analysis | Fast inference, Strong coding abilities | |
mistral-large | Mistral | General tasks, Content generation, Classification | Open source, Good price/performance |
deepseek-reasoner | DeepSeek | Mathematical analysis, Logic problems, Scientific computing | Specialized reasoning capabilities |
Provider Capabilities¶
Provider | Strengths | Best For | Integration Notes |
---|---|---|---|
OpenAI | Consistent performance, Strong reasoning | Production systems, Complex tasks | Requires API key setup |
Anthropic | Safety features, Detailed analysis | Research, Technical writing | Claude-specific formatting |
Technical tasks, Multimodal support | Code generation, Analysis | Vertex AI integration available | |
Groq | High-speed inference | Real-time applications | Optimized for specific models |
DeepSeek | Specialized reasoning | Scientific computing | Custom API integration |
Mistral | Open source flexibility | General applications | Self-hosted options available |
Performance Optimization Tips¶
- Token Management
- Set appropriate max_tokens based on task complexity
- Monitor token usage for cost optimization
-
Use streaming for long outputs
-
Concurrency Settings
- Adjust max_workers based on system resources
- Use "auto" workers for optimal CPU utilization
-
Monitor memory usage with large batch sizes
-
Temperature Tuning
- Lower (0.1-0.3) for factual/analytical tasks
- Higher (0.7-0.9) for creative tasks
-
Mid-range (0.4-0.6) for balanced outputs
-
System Prompts
- Customize for specific domains
- Include relevant context
- Define clear output formats
Dependencies¶
- asyncio: Asynchronous I/O support
- concurrent.futures: Thread pool execution
- pydantic: Data validation
- litellm: LLM interface standardization