> ## Documentation Index
> Fetch the complete documentation index at: https://docs.swarms.world/llms.txt
> Use this file to discover all available pages before exploring further.

# CronJob

> Schedule and run Swarms agents at specified intervals with cron-style scheduling

## Overview

The **CronJob** class wraps any callable (including Swarms agents) and turns it into a scheduled job that runs at specified intervals. It provides robust scheduling, error handling, retry logic, and execution tracking with optional callback support for output customization.

## Constructor

Create a CronJob instance to schedule recurring agent executions.

```python theme={null}
from swarms.structs.cron_job import CronJob
from swarms import Agent

# Create an agent
agent = Agent(
    agent_name="Financial-Analyst",
    system_prompt="You are a financial analyst...",
    model_name="gpt-4"
)

# Create scheduled job
cron = CronJob(
    agent=agent,
    interval="10minutes",
    job_id="financial_analysis_job"
)
```

### Parameters

<ParamField path="agent" type="Union[Any, Callable]" default="None">
  The Swarms Agent instance or callable to be scheduled
</ParamField>

<ParamField path="interval" type="str" default="None">
  The interval string (e.g., "5seconds", "10minutes", "1hour")
</ParamField>

<ParamField path="job_id" type="str" default="None">
  Optional unique identifier for the job. If not provided, one will be generated.
</ParamField>

<ParamField path="callback" type="Callable[[Any, str, dict], Any]" default="None">
  Optional callback function to customize output processing.

  **Signature:** `callback(output: Any, task: str, metadata: dict) -> Any`

  * `output`: The original output from the agent
  * `task`: The task that was executed
  * `metadata`: Dictionary containing job\_id, timestamp, execution\_count, etc.
  * Returns: The customized output
</ParamField>

### Attributes

<ResponseField name="job_id" type="str">
  Unique identifier for the job
</ResponseField>

<ResponseField name="is_running" type="bool">
  Flag indicating if the job is currently running
</ResponseField>

<ResponseField name="execution_count" type="int">
  Number of times the job has been executed
</ResponseField>

<ResponseField name="start_time" type="float">
  Timestamp when the job was started
</ResponseField>

## Methods

### run

Schedule and run the job with a specified task.

```python theme={null}
cron.run(
    task="Analyze Q4 earnings for AAPL",
    img="/path/to/chart.png"  # Optional image parameter
)
```

<ParamField path="task" type="str" required>
  The task string to be executed by the agent
</ParamField>

<ParamField path="**kwargs" type="Any">
  Additional parameters to pass to the agent's run method (e.g., `img`, `imgs`, `correct_answer`, `streaming_callback`)
</ParamField>

**Raises:**

* `CronJobConfigError`: If agent or interval is not configured
* `CronJobExecutionError`: If task execution fails

**Behavior:**

* Schedules the task according to the configured interval
* Starts the background execution thread
* Blocks until KeyboardInterrupt (Ctrl+C) is received
* Automatically stops on interrupt

### batched\_run

Run multiple tasks sequentially with the same schedule.

```python theme={null}
tasks = [
    "Analyze AAPL stock",
    "Analyze GOOGL stock",
    "Analyze MSFT stock"
]

results = cron.batched_run(tasks)
```

<ParamField path="tasks" type="List[str]" required>
  List of task strings to execute
</ParamField>

<ParamField path="**kwargs" type="Any">
  Additional parameters to pass to the agent's run method
</ParamField>

<ResponseField name="return" type="List[Any]">
  List of results from each task execution
</ResponseField>

### start

Manually start the scheduled job.

```python theme={null}
cron.start()
```

**Raises:**

* `CronJobExecutionError`: If the job fails to start

**Behavior:**

* Creates a daemon thread for job execution
* Sets `is_running` to True
* Records `start_time`
* If already running, logs a warning

### stop

Stop the scheduled job.

```python theme={null}
cron.stop()
```

**Raises:**

* `CronJobExecutionError`: If the job fails to stop properly

**Behavior:**

* Sets `is_running` to False
* Waits up to 5 seconds for thread to terminate
* Clears the schedule
* Logs warning if thread doesn't terminate gracefully

### set\_callback

Set or update the callback function for output customization.

```python theme={null}
def custom_callback(output, task, metadata):
    # Process output
    processed = output.upper()
    
    # Log execution info
    print(f"Execution #{metadata['execution_count']}")
    print(f"Task: {task}")
    print(f"Timestamp: {metadata['timestamp']}")
    
    return processed

cron.set_callback(custom_callback)
```

<ParamField path="callback" type="Callable[[Any, str, dict], Any]" required>
  Callback function with signature: `callback(output, task, metadata) -> Any`
</ParamField>

### get\_execution\_stats

Get execution statistics for the cron job.

```python theme={null}
stats = cron.get_execution_stats()
print(stats)
# {
#   "job_id": "financial_analysis_job",
#   "is_running": True,
#   "execution_count": 42,
#   "start_time": 1234567890.123,
#   "uptime": 3600.5,
#   "interval": "10minutes"
# }
```

<ResponseField name="return" type="dict">
  Dictionary containing:

  * `job_id`: Job identifier
  * `is_running`: Current running status
  * `execution_count`: Number of executions
  * `start_time`: Start timestamp
  * `uptime`: Time elapsed since start (seconds)
  * `interval`: Configured interval string
</ResponseField>

## Interval Formats

The `interval` parameter accepts strings in the format `<number><unit>`:

### Supported Units

* **Seconds**: `"5seconds"`, `"30second"`
* **Minutes**: `"10minutes"`, `"1minute"`
* **Hours**: `"2hours"`, `"1hour"`

### Examples

```python theme={null}
# Every 5 seconds
CronJob(agent=agent, interval="5seconds")

# Every 30 seconds
CronJob(agent=agent, interval="30seconds")

# Every 10 minutes
CronJob(agent=agent, interval="10minutes")

# Every 2 hours
CronJob(agent=agent, interval="2hours")
```

## Complete Examples

### Basic Scheduled Analysis

```python theme={null}
from swarms import Agent
from swarms.structs.cron_job import CronJob

# Create financial analyst agent
agent = Agent(
    agent_name="Market-Analyst",
    system_prompt="""You are an expert market analyst.
    Provide concise daily market updates focusing on:
    1. Major index movements
    2. Notable sector performance
    3. Key market drivers
    """,
    model_name="gpt-4",
    max_loops=1,
    verbose=True
)

# Schedule to run every hour
cron = CronJob(
    agent=agent,
    interval="1hour",
    job_id="hourly_market_update"
)

# Run the scheduled job
try:
    cron.run(task="Provide a market update for the current hour")
except KeyboardInterrupt:
    print("Stopping scheduled job...")
    cron.stop()
```

### Multi-Stock Analysis with Batching

```python theme={null}
from swarms import Agent
from swarms.structs.cron_job import CronJob

# Create stock analysis agent
stock_agent = Agent(
    agent_name="Stock-Analyzer",
    system_prompt="Analyze stock performance and provide investment insights.",
    model_name="gpt-4",
    max_loops=2
)

# Schedule every 30 minutes
cron = CronJob(
    agent=stock_agent,
    interval="30minutes",
    job_id="stock_analysis"
)

# Analyze multiple stocks
stocks = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
tasks = [f"Analyze {symbol} stock performance" for symbol in stocks]

try:
    results = cron.batched_run(tasks)
    for symbol, result in zip(stocks, results):
        print(f"\n{symbol} Analysis:")
        print(result)
except KeyboardInterrupt:
    print("Analysis stopped.")
    cron.stop()
```

### Custom Callback for Output Processing

```python theme={null}
from swarms import Agent
from swarms.structs.cron_job import CronJob
import json
from datetime import datetime

def save_analysis_callback(output, task, metadata):
    """Save analysis results to file with metadata."""
    result = {
        "job_id": metadata["job_id"],
        "execution_number": metadata["execution_count"],
        "timestamp": datetime.fromtimestamp(metadata["timestamp"]).isoformat(),
        "task": task,
        "analysis": output,
        "uptime_seconds": metadata.get("start_time", 0)
    }
    
    # Save to file
    filename = f"analysis_{metadata['execution_count']}.json"
    with open(filename, 'w') as f:
        json.dump(result, f, indent=2)
    
    print(f"Saved analysis #{metadata['execution_count']} to {filename}")
    return output

# Create agent
agent = Agent(
    agent_name="Crypto-Analyst",
    system_prompt="Analyze cryptocurrency market trends.",
    model_name="gpt-4"
)

# Create cron with callback
cron = CronJob(
    agent=agent,
    interval="15minutes",
    job_id="crypto_analysis",
    callback=save_analysis_callback
)

try:
    cron.run(task="Analyze Bitcoin and Ethereum market trends")
except KeyboardInterrupt:
    cron.stop()
```

### Image Analysis with Scheduling

```python theme={null}
from swarms import Agent
from swarms.structs.cron_job import CronJob

# Create vision-capable agent
vision_agent = Agent(
    agent_name="Chart-Analyzer",
    system_prompt="Analyze financial charts and provide technical insights.",
    model_name="claude-sonnet-4-6",  # Vision-capable model
    max_loops=1
)

# Schedule chart analysis every 2 hours
cron = CronJob(
    agent=vision_agent,
    interval="2hours",
    job_id="chart_analysis"
)

try:
    cron.run(
        task="Analyze this chart for support/resistance levels and trend direction",
        img="/path/to/chart.png"
    )
except KeyboardInterrupt:
    cron.stop()
```

### Monitoring Execution Stats

```python theme={null}
from swarms import Agent
from swarms.structs.cron_job import CronJob
import time

agent = Agent(
    agent_name="Monitor-Agent",
    system_prompt="Monitor system metrics.",
    model_name="gpt-4"
)

cron = CronJob(
    agent=agent,
    interval="10seconds",
    job_id="monitor"
)

# Start the job in background
cron.start()

try:
    # Monitor stats every 30 seconds
    while True:
        time.sleep(30)
        stats = cron.get_execution_stats()
        print(f"\nExecution Stats:")
        print(f"  Running: {stats['is_running']}")
        print(f"  Executions: {stats['execution_count']}")
        print(f"  Uptime: {stats['uptime']:.2f}s")
except KeyboardInterrupt:
    print("Stopping monitor...")
    cron.stop()
```

## Exception Handling

The CronJob class defines several custom exceptions:

### CronJobError

Base exception class for all CronJob errors.

### CronJobConfigError

Raised for configuration errors.

```python theme={null}
try:
    cron = CronJob(agent=None, interval="10minutes")
except CronJobConfigError as e:
    print(f"Configuration error: {e}")
```

### CronJobScheduleError

Raised for scheduling related errors.

```python theme={null}
try:
    cron = CronJob(agent=agent, interval="invalid_format")
except CronJobConfigError as e:
    print(f"Invalid interval: {e}")
```

### CronJobExecutionError

Raised for execution related errors.

```python theme={null}
try:
    cron.run(task="")
except CronJobExecutionError as e:
    print(f"Execution failed: {e}")
```

## Best Practices

1. **Use descriptive job IDs**: Make job identifiers meaningful for tracking
2. **Set appropriate intervals**: Choose intervals based on task complexity and resource availability
3. **Implement callbacks**: Use callbacks for logging, saving results, or sending notifications
4. **Monitor execution stats**: Regularly check stats to ensure jobs are running as expected
5. **Handle interrupts**: Always wrap `run()` in try-except to handle KeyboardInterrupt
6. **Consider agent limits**: Ensure your agent's `max_loops` is appropriate for scheduled tasks
7. **Log failures**: Enable verbose mode and implement proper error logging
8. **Test intervals**: Start with longer intervals and optimize based on performance
9. **Resource management**: Be mindful of API rate limits and costs with frequent scheduling
10. **Graceful shutdown**: Always call `stop()` when terminating scheduled jobs

## Thread Safety

CronJob uses threading for background execution:

* Daemon threads are used to prevent blocking program exit
* Thread-safe scheduling with the `schedule` library
* Proper cleanup on stop() with timeout handling

## Callback Metadata

The callback function receives a metadata dictionary with:

```python theme={null}
{
    "job_id": str,              # Job identifier
    "timestamp": float,         # Unix timestamp of execution
    "execution_count": int,     # Number of times executed
    "task": str,                # The task that was run
    "kwargs": dict,             # Additional kwargs passed
    "start_time": float,        # Job start timestamp
    "is_running": bool          # Current running status
}
```

Use this metadata for:

* Logging execution history
* Conditional processing based on execution count
* Time-based analysis
* Debugging and monitoring
