Skip to main content

Overview

The ModelRouter is an intelligent routing system that automatically selects and executes AI models based on task requirements. It leverages a function-calling architecture to analyze tasks and recommend the optimal model and provider combination for each specific use case.

Key Features

  • Dynamic model selection based on task complexity and requirements
  • Multi-provider support (OpenAI, Anthropic, Google, etc.)
  • Concurrent and asynchronous execution capabilities
  • Batch processing with memory
  • Automatic error handling and retries
  • Provider-aware routing
  • Cost optimization

Installation

  1. Install the latest version of swarms:
pip install -U swarms
  1. Set up your API keys in your .env file:
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
GOOGLE_API_KEY=your_google_api_key
# Add more API keys as needed following litellm format

Attributes

system_prompt
str
default:"model_router_system_prompt"
Custom prompt for guiding model selection behavior.
max_tokens
int
default:"4000"
Maximum token limit for model outputs.
temperature
float
default:"0.5"
Control parameter for response randomness (0.0-1.0).
max_workers
int | str
default:"10"
Maximum concurrent workers. Use "auto" for CPU count.
api_key
str
default:"None"
API key for model access.
max_loops
int
default:"1"
Maximum number of refinement iterations.

Methods

run()

Executes a single task through the model router with memory and refinement capabilities.
def run(self, task: str) -> str
Parameters:
  • task (str): The task to be executed
Returns: str - The result of task execution

batch_run()

Executes multiple tasks sequentially with result aggregation.
def batch_run(self, tasks: list) -> list
Parameters:
  • tasks (list): List of task strings to be executed
Returns: list - List of results, one for each task

concurrent_run()

Parallel execution of multiple tasks using thread pooling.
def concurrent_run(self, tasks: list) -> list
Parameters:
  • tasks (list): List of task strings to be executed
Returns: list - List of results from parallel execution

async_run()

Asynchronous task execution with coroutine support.
async def async_run(self, task: str) -> asyncio.Task
Parameters:
  • task (str): The task to be executed
Returns: asyncio.Task - An asyncio Task object

Usage Examples

Basic Usage

from swarms import ModelRouter

router = ModelRouter()

# Simple text analysis
result = router.run("Analyze the sentiment and key themes in this customer feedback")

# Complex reasoning task
complex_result = router.run("""
Evaluate the following business proposal:
- Initial investment: $500,000
- Projected ROI: 25% annually
- Market size: $2B
- Competition: 3 major players
Provide detailed analysis and recommendations.
""")

Batch Processing

from swarms import ModelRouter

router = ModelRouter()

# Multiple analysis tasks
tasks = [
    "Analyze Q1 financial performance",
    "Predict Q2 market trends",
    "Evaluate competitor strategies",
    "Generate growth recommendations"
]

results = router.batch_run(tasks)

# Process results
for task, result in zip(tasks, results):
    print(f"Task: {task}\nResult: {result}\n")

Concurrent Execution

from swarms import ModelRouter

router = ModelRouter()

# Define multiple concurrent tasks
analysis_tasks = [
    "Perform technical analysis of AAPL stock",
    "Analyze market sentiment from social media",
    "Generate trading signals",
    "Calculate risk metrics"
]

# Execute tasks concurrently
results = router.concurrent_run(analysis_tasks)

# Process results with error handling
for task, result in zip(analysis_tasks, results):
    try:
        processed_result = process_analysis(result)
        save_to_database(processed_result)
    except Exception as e:
        log_error(f"Error processing {task}: {str(e)}")

Asynchronous Execution

import asyncio
from swarms import ModelRouter

async def process_data_stream():
    router = ModelRouter()
    tasks = []
    async for data in data_stream:
        task = await router.async_run(f"Process data: {data}")
        tasks.append(task)

    results = await asyncio.gather(*tasks)
    return results

# Usage in async context
async def main():
    results = await process_data_stream()

Financial Analysis System

from swarms import ModelRouter
from typing import Dict, List
import pandas as pd

class FinancialAnalysisSystem:
    def __init__(self):
        self.router = ModelRouter(
            temperature=0.3,  # Lower temperature for more deterministic outputs
            max_tokens=8000,  # Higher token limit for detailed analysis
            max_loops=2  # Allow for refinement iteration
        )

    def analyze_company_financials(self, financial_data: Dict) -> Dict:
        analysis_task = f"""
        Perform comprehensive financial analysis:

        Financial Metrics:
        - Revenue: ${financial_data['revenue']}M
        - EBITDA: ${financial_data['ebitda']}M
        - Debt/Equity: {financial_data['debt_equity']}
        - Working Capital: ${financial_data['working_capital']}M

        Required Analysis:
        1. Profitability assessment
        2. Liquidity analysis
        3. Growth projections
        4. Risk evaluation
        5. Investment recommendations

        Provide detailed insights and actionable recommendations.
        """

        result = self.router.run(analysis_task)
        return self._parse_analysis_result(result)

    def _parse_analysis_result(self, result: str) -> Dict:
        # Implementation of result parsing
        pass

# Usage
analyzer = FinancialAnalysisSystem()
company_data = {
    'revenue': 150,
    'ebitda': 45,
    'debt_equity': 0.8,
    'working_capital': 25
}

analysis = analyzer.analyze_company_financials(company_data)

Healthcare Data Processing Pipeline

import asyncio
from swarms import ModelRouter
from typing import List, Dict

class MedicalDataProcessor:
    def __init__(self):
        self.router = ModelRouter(
            max_workers="auto",  # Automatic worker scaling
            temperature=0.2,     # Conservative temperature for medical analysis
            system_prompt="""You are a specialized medical data analyzer focused on:
            1. Clinical terminology interpretation
            2. Patient data analysis
            3. Treatment recommendation review
            4. Medical research synthesis"""
        )

    async def process_patient_records(self, records: List[Dict]) -> List[Dict]:
        analysis_tasks = []

        for record in records:
            task = f"""
            Analyze patient record:
            - Age: {record['age']}
            - Symptoms: {', '.join(record['symptoms'])}
            - Vital Signs: {record['vitals']}
            - Medications: {', '.join(record['medications'])}
            - Lab Results: {record['lab_results']}

            Provide:
            1. Symptom analysis
            2. Medication interaction check
            3. Lab results interpretation
            4. Treatment recommendations
            """
            analysis_tasks.append(task)

        results = await asyncio.gather(*[
            self.router.async_run(task) for task in analysis_tasks
        ])

        return [self._parse_medical_analysis(r) for r in results]

    def _parse_medical_analysis(self, analysis: str) -> Dict:
        # Implementation of medical analysis parsing
        pass

# Usage
async def main():
    processor = MedicalDataProcessor()
    patient_records = [
        {
            'age': 45,
            'symptoms': ['fever', 'cough', 'fatigue'],
            'vitals': {'bp': '120/80', 'temp': '38.5C'},
            'medications': ['lisinopril', 'metformin'],
            'lab_results': 'WBC: 11,000, CRP: 2.5'
        }
        # More records...
    ]

    analyses = await processor.process_patient_records(patient_records)

NLP Processing Pipeline

from swarms import ModelRouter
from typing import List, Dict

class NLPPipeline:
    def __init__(self):
        self.router = ModelRouter(
            temperature=0.4,
            max_loops=2
        )

    def process_documents(self, documents: List[str]) -> List[Dict]:
        tasks = [self._create_nlp_task(doc) for doc in documents]
        results = self.router.concurrent_run(tasks)
        return [self._parse_nlp_result(r) for r in results]

    def _create_nlp_task(self, document: str) -> str:
        return f"""
        Perform comprehensive NLP analysis:

        Text: {document}

        Required Analysis:
        1. Entity recognition
        2. Sentiment analysis
        3. Topic classification
        4. Key phrase extraction
        5. Intent detection

        Provide structured analysis with confidence scores.
        """

    def _parse_nlp_result(self, result: str) -> Dict:
        # Implementation of NLP result parsing
        pass

# Usage
pipeline = NLPPipeline()
documents = [
    "We're extremely satisfied with the new product features!",
    "The customer service response time needs improvement.",
    "Looking to upgrade our subscription plan next month."
]

analyses = pipeline.process_documents(documents)

Available Models and Use Cases

ModelProviderOptimal Use CasesCharacteristics
gpt-4-turboOpenAIComplex reasoning, Code generation, Creative writingHigh accuracy, Latest knowledge cutoff
claude-3-opusAnthropicResearch analysis, Technical documentation, Long-form contentStrong reasoning, Detailed outputs
gemini-proGoogleMultimodal tasks, Code generation, Technical analysisFast inference, Strong coding abilities
mistral-largeMistralGeneral tasks, Content generation, ClassificationOpen source, Good price/performance
deepseek-reasonerDeepSeekMathematical analysis, Logic problems, Scientific computingSpecialized reasoning capabilities

Provider Capabilities

ProviderStrengthsBest ForIntegration Notes
OpenAIConsistent performance, Strong reasoningProduction systems, Complex tasksRequires API key setup
AnthropicSafety features, Detailed analysisResearch, Technical writingClaude-specific formatting
GoogleTechnical tasks, Multimodal supportCode generation, AnalysisVertex AI integration available
GroqHigh-speed inferenceReal-time applicationsOptimized for specific models
DeepSeekSpecialized reasoningScientific computingCustom API integration
MistralOpen source flexibilityGeneral applicationsSelf-hosted options available

Performance Optimization Tips

  1. Token Management
    • Set appropriate max_tokens based on task complexity
    • Monitor token usage for cost optimization
    • Use streaming for long outputs
  2. Concurrency Settings
    • Adjust max_workers based on system resources
    • Use "auto" workers for optimal CPU utilization
    • Monitor memory usage with large batch sizes
  3. Temperature Tuning
    • Lower (0.1-0.3) for factual/analytical tasks
    • Higher (0.7-0.9) for creative tasks
    • Mid-range (0.4-0.6) for balanced outputs
  4. System Prompts
    • Customize for specific domains
    • Include relevant context
    • Define clear output formats

Dependencies

  • asyncio: Asynchronous I/O support
  • concurrent.futures: Thread pool execution
  • pydantic: Data validation
  • litellm: LLM interface standardization

Source Code

View the source code on GitHub