Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.swarms.world/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The ModelRouter is an intelligent routing system that automatically selects and executes AI models based on task requirements. It leverages a function-calling architecture to analyze tasks and recommend the optimal model and provider combination for each specific use case.

Key Features

  • Dynamic model selection based on task complexity and requirements
  • Multi-provider support (OpenAI, Anthropic, Google, etc.)
  • Concurrent and asynchronous execution capabilities
  • Batch processing with memory
  • Automatic error handling and retries
  • Provider-aware routing
  • Cost optimization

Installation

  1. Install the latest version of swarms:
pip install -U swarms
  1. Set up your API keys in your .env file:
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
GOOGLE_API_KEY=your_google_api_key
# Add more API keys as needed following litellm format

Attributes

system_prompt
str
default:"model_router_system_prompt"
Custom prompt for guiding model selection behavior.
max_tokens
int
default:"4000"
Maximum token limit for model outputs.
temperature
float
default:"0.5"
Control parameter for response randomness (0.0-1.0).
max_workers
int | str
default:"10"
Maximum concurrent workers. Use "auto" for CPU count.
api_key
str
default:"None"
API key for model access.
max_loops
int
default:"1"
Maximum number of refinement iterations.

Methods

run()

Executes a single task through the model router with memory and refinement capabilities.
def run(self, task: str) -> str
Parameters:
  • task (str): The task to be executed
Returns: str - The result of task execution

batch_run()

Executes multiple tasks sequentially with result aggregation.
def batch_run(self, tasks: list) -> list
Parameters:
  • tasks (list): List of task strings to be executed
Returns: list - List of results, one for each task

concurrent_run()

Parallel execution of multiple tasks using thread pooling.
def concurrent_run(self, tasks: list) -> list
Parameters:
  • tasks (list): List of task strings to be executed
Returns: list - List of results from parallel execution

async_run()

Asynchronous task execution with coroutine support.
async def async_run(self, task: str) -> asyncio.Task
Parameters:
  • task (str): The task to be executed
Returns: asyncio.Task - An asyncio Task object

Usage Examples

Basic Usage

from swarms import ModelRouter

router = ModelRouter()

# Simple text analysis
result = router.run("Analyze the sentiment and key themes in this customer feedback")

# Complex reasoning task
complex_result = router.run("""
Evaluate the following business proposal:
- Initial investment: $500,000
- Projected ROI: 25% annually
- Market size: $2B
- Competition: 3 major players
Provide detailed analysis and recommendations.
""")

Batch Processing

from swarms import ModelRouter

router = ModelRouter()

# Multiple analysis tasks
tasks = [
    "Analyze Q1 financial performance",
    "Predict Q2 market trends",
    "Evaluate competitor strategies",
    "Generate growth recommendations"
]

results = router.batch_run(tasks)

# Process results
for task, result in zip(tasks, results):
    print(f"Task: {task}\nResult: {result}\n")

Concurrent Execution

from swarms import ModelRouter

router = ModelRouter()

# Define multiple concurrent tasks
analysis_tasks = [
    "Perform technical analysis of AAPL stock",
    "Analyze market sentiment from social media",
    "Generate trading signals",
    "Calculate risk metrics"
]

# Execute tasks concurrently
results = router.concurrent_run(analysis_tasks)

# Process results with error handling
for task, result in zip(analysis_tasks, results):
    try:
        processed_result = process_analysis(result)
        save_to_database(processed_result)
    except Exception as e:
        log_error(f"Error processing {task}: {str(e)}")

Asynchronous Execution

import asyncio
from swarms import ModelRouter

async def process_data_stream():
    router = ModelRouter()
    tasks = []
    async for data in data_stream:
        task = await router.async_run(f"Process data: {data}")
        tasks.append(task)

    results = await asyncio.gather(*tasks)
    return results

# Usage in async context
async def main():
    results = await process_data_stream()

Financial Analysis System

from swarms import ModelRouter
from typing import Dict, List
import pandas as pd

class FinancialAnalysisSystem:
    def __init__(self):
        self.router = ModelRouter(
            temperature=0.3,  # Lower temperature for more deterministic outputs
            max_tokens=8000,  # Higher token limit for detailed analysis
            max_loops=2  # Allow for refinement iteration
        )

    def analyze_company_financials(self, financial_data: Dict) -> Dict:
        analysis_task = f"""
        Perform comprehensive financial analysis:

        Financial Metrics:
        - Revenue: ${financial_data['revenue']}M
        - EBITDA: ${financial_data['ebitda']}M
        - Debt/Equity: {financial_data['debt_equity']}
        - Working Capital: ${financial_data['working_capital']}M

        Required Analysis:
        1. Profitability assessment
        2. Liquidity analysis
        3. Growth projections
        4. Risk evaluation
        5. Investment recommendations

        Provide detailed insights and actionable recommendations.
        """

        result = self.router.run(analysis_task)
        return self._parse_analysis_result(result)

    def _parse_analysis_result(self, result: str) -> Dict:
        # Implementation of result parsing
        pass

# Usage
analyzer = FinancialAnalysisSystem()
company_data = {
    'revenue': 150,
    'ebitda': 45,
    'debt_equity': 0.8,
    'working_capital': 25
}

analysis = analyzer.analyze_company_financials(company_data)

Healthcare Data Processing Pipeline

import asyncio
from swarms import ModelRouter
from typing import List, Dict

class MedicalDataProcessor:
    def __init__(self):
        self.router = ModelRouter(
            max_workers="auto",  # Automatic worker scaling
            temperature=0.2,     # Conservative temperature for medical analysis
            system_prompt="""You are a specialized medical data analyzer focused on:
            1. Clinical terminology interpretation
            2. Patient data analysis
            3. Treatment recommendation review
            4. Medical research synthesis"""
        )

    async def process_patient_records(self, records: List[Dict]) -> List[Dict]:
        analysis_tasks = []

        for record in records:
            task = f"""
            Analyze patient record:
            - Age: {record['age']}
            - Symptoms: {', '.join(record['symptoms'])}
            - Vital Signs: {record['vitals']}
            - Medications: {', '.join(record['medications'])}
            - Lab Results: {record['lab_results']}

            Provide:
            1. Symptom analysis
            2. Medication interaction check
            3. Lab results interpretation
            4. Treatment recommendations
            """
            analysis_tasks.append(task)

        results = await asyncio.gather(*[
            self.router.async_run(task) for task in analysis_tasks
        ])

        return [self._parse_medical_analysis(r) for r in results]

    def _parse_medical_analysis(self, analysis: str) -> Dict:
        # Implementation of medical analysis parsing
        pass

# Usage
async def main():
    processor = MedicalDataProcessor()
    patient_records = [
        {
            'age': 45,
            'symptoms': ['fever', 'cough', 'fatigue'],
            'vitals': {'bp': '120/80', 'temp': '38.5C'},
            'medications': ['lisinopril', 'metformin'],
            'lab_results': 'WBC: 11,000, CRP: 2.5'
        }
        # More records...
    ]

    analyses = await processor.process_patient_records(patient_records)

NLP Processing Pipeline

from swarms import ModelRouter
from typing import List, Dict

class NLPPipeline:
    def __init__(self):
        self.router = ModelRouter(
            temperature=0.4,
            max_loops=2
        )

    def process_documents(self, documents: List[str]) -> List[Dict]:
        tasks = [self._create_nlp_task(doc) for doc in documents]
        results = self.router.concurrent_run(tasks)
        return [self._parse_nlp_result(r) for r in results]

    def _create_nlp_task(self, document: str) -> str:
        return f"""
        Perform comprehensive NLP analysis:

        Text: {document}

        Required Analysis:
        1. Entity recognition
        2. Sentiment analysis
        3. Topic classification
        4. Key phrase extraction
        5. Intent detection

        Provide structured analysis with confidence scores.
        """

    def _parse_nlp_result(self, result: str) -> Dict:
        # Implementation of NLP result parsing
        pass

# Usage
pipeline = NLPPipeline()
documents = [
    "We're extremely satisfied with the new product features!",
    "The customer service response time needs improvement.",
    "Looking to upgrade our subscription plan next month."
]

analyses = pipeline.process_documents(documents)

Available Models and Use Cases

ModelProviderOptimal Use CasesCharacteristics
gpt-4-turboOpenAIComplex reasoning, Code generation, Creative writingHigh accuracy, Latest knowledge cutoff
claude-3-opusAnthropicResearch analysis, Technical documentation, Long-form contentStrong reasoning, Detailed outputs
gemini-proGoogleMultimodal tasks, Code generation, Technical analysisFast inference, Strong coding abilities
mistral-largeMistralGeneral tasks, Content generation, ClassificationOpen source, Good price/performance
deepseek-reasonerDeepSeekMathematical analysis, Logic problems, Scientific computingSpecialized reasoning capabilities

Provider Capabilities

ProviderStrengthsBest ForIntegration Notes
OpenAIConsistent performance, Strong reasoningProduction systems, Complex tasksRequires API key setup
AnthropicSafety features, Detailed analysisResearch, Technical writingClaude-specific formatting
GoogleTechnical tasks, Multimodal supportCode generation, AnalysisVertex AI integration available
GroqHigh-speed inferenceReal-time applicationsOptimized for specific models
DeepSeekSpecialized reasoningScientific computingCustom API integration
MistralOpen source flexibilityGeneral applicationsSelf-hosted options available

Performance Optimization Tips

  1. Token Management
    • Set appropriate max_tokens based on task complexity
    • Monitor token usage for cost optimization
    • Use streaming for long outputs
  2. Concurrency Settings
    • Adjust max_workers based on system resources
    • Use "auto" workers for optimal CPU utilization
    • Monitor memory usage with large batch sizes
  3. Temperature Tuning
    • Lower (0.1-0.3) for factual/analytical tasks
    • Higher (0.7-0.9) for creative tasks
    • Mid-range (0.4-0.6) for balanced outputs
  4. System Prompts
    • Customize for specific domains
    • Include relevant context
    • Define clear output formats

Dependencies

  • asyncio: Asynchronous I/O support
  • concurrent.futures: Thread pool execution
  • pydantic: Data validation
  • litellm: LLM interface standardization

Source Code

View the source code on GitHub