Swarms Cloud API Client Documentation¶
Introduction¶
The Swarms Cloud API client is a production-grade Python package for interacting with the Swarms API. It provides both synchronous and asynchronous interfaces, making it suitable for a wide range of applications from simple scripts to high-performance, scalable services.
Key features include: - Connection pooling and efficient session management - Automatic retries with exponential backoff - Circuit breaker pattern for improved reliability - In-memory caching for frequently accessed resources - Comprehensive error handling with detailed exceptions - Full support for asynchronous operations - Type checking with Pydantic
This documentation covers all available client methods with detailed descriptions, parameter references, and usage examples.
Installation¶
Authentication¶
To use the Swarms API, you need an API key. You can obtain your API key from the Swarms Platform API Keys page.
Client Initialization¶
The SwarmsClient
is the main entry point for interacting with the Swarms API. It can be initialized with various configuration options to customize its behavior.
from swarms_client import SwarmsClient
# Initialize with default settings
client = SwarmsClient(api_key="your-api-key")
# Or with custom settings
client = SwarmsClient(
api_key="your-api-key",
base_url="https://swarms-api-285321057562.us-east1.run.app",
timeout=60,
max_retries=3,
retry_delay=1,
log_level="INFO",
pool_connections=100,
pool_maxsize=100,
keep_alive_timeout=5,
max_concurrent_requests=100,
circuit_breaker_threshold=5,
circuit_breaker_timeout=60,
enable_cache=True
)
Parameters¶
Parameter | Type | Default | Description |
---|---|---|---|
api_key |
str |
Environment variable SWARMS_API_KEY |
API key for authentication |
base_url |
str |
"https://swarms-api-285321057562.us-east1.run.app" |
Base URL for the API |
timeout |
int |
60 |
Timeout for API requests in seconds |
max_retries |
int |
3 |
Maximum number of retry attempts for failed requests |
retry_delay |
int |
1 |
Initial delay between retries in seconds (uses exponential backoff) |
log_level |
str |
"INFO" |
Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) |
pool_connections |
int |
100 |
Number of connection pools to cache |
pool_maxsize |
int |
100 |
Maximum number of connections to save in the pool |
keep_alive_timeout |
int |
5 |
Keep-alive timeout for connections in seconds |
max_concurrent_requests |
int |
100 |
Maximum number of concurrent requests |
circuit_breaker_threshold |
int |
5 |
Failure threshold for the circuit breaker |
circuit_breaker_timeout |
int |
60 |
Reset timeout for the circuit breaker in seconds |
enable_cache |
bool |
True |
Whether to enable in-memory caching |
Client Methods¶
clear_cache¶
Clears the in-memory cache used for caching API responses.
Agent Resource¶
The Agent resource provides methods for creating and managing agent completions.
create¶
Creates an agent completion.
response = client.agent.create(
agent_config={
"agent_name": "Researcher",
"description": "Conducts in-depth research on topics",
"model_name": "gpt-4o",
"temperature": 0.7
},
task="Research the latest advancements in quantum computing and summarize the key findings"
)
print(f"Agent ID: {response.id}")
print(f"Output: {response.outputs}")
Parameters¶
Parameter | Type | Required | Description |
---|---|---|---|
agent_config |
dict or AgentSpec |
Yes | Configuration for the agent |
task |
str |
Yes | The task for the agent to complete |
history |
dict |
No | Optional conversation history |
The agent_config
parameter can include the following fields:
Field | Type | Default | Description |
---|---|---|---|
agent_name |
str |
Required | Name of the agent |
description |
str |
None |
Description of the agent's purpose |
system_prompt |
str |
None |
System prompt to guide the agent's behavior |
model_name |
str |
"gpt-4o-mini" |
Name of the model to use |
auto_generate_prompt |
bool |
False |
Whether to automatically generate a prompt |
max_tokens |
int |
8192 |
Maximum tokens in the response |
temperature |
float |
0.5 |
Temperature for sampling (0-1) |
role |
str |
None |
Role of the agent |
max_loops |
int |
1 |
Maximum number of reasoning loops |
tools_dictionary |
List[Dict] |
None |
Tools available to the agent |
Returns¶
AgentCompletionResponse
object with the following properties:
id
: Unique identifier for the completionsuccess
: Whether the completion was successfulname
: Name of the agentdescription
: Description of the agenttemperature
: Temperature used for the completionoutputs
: Output from the agentusage
: Token usage informationtimestamp
: Timestamp of the completion
create_batch¶
Creates multiple agent completions in batch.
responses = client.agent.create_batch([
{
"agent_config": {
"agent_name": "Researcher",
"model_name": "gpt-4o-mini",
"temperature": 0.5
},
"task": "Summarize the latest quantum computing research"
},
{
"agent_config": {
"agent_name": "Writer",
"model_name": "gpt-4o",
"temperature": 0.7
},
"task": "Write a blog post about AI safety"
}
])
for i, response in enumerate(responses):
print(f"Agent {i+1} ID: {response.id}")
print(f"Output: {response.outputs}")
print("---")
Parameters¶
Parameter | Type | Required | Description |
---|---|---|---|
completions |
List[Dict or AgentCompletion] |
Yes | List of agent completion requests |
Each item in the completions
list should have the same structure as the parameters for the create
method.
Returns¶
List of AgentCompletionResponse
objects with the same properties as the return value of the create
method.
acreate¶
Creates an agent completion asynchronously.
import asyncio
from swarms_client import SwarmsClient
async def main():
async with SwarmsClient(api_key="your-api-key") as client:
response = await client.agent.acreate(
agent_config={
"agent_name": "Researcher",
"description": "Conducts in-depth research",
"model_name": "gpt-4o"
},
task="Research the impact of quantum computing on cryptography"
)
print(f"Agent ID: {response.id}")
print(f"Output: {response.outputs}")
asyncio.run(main())
Parameters¶
Same as the create
method.
Returns¶
Same as the create
method.
acreate_batch¶
Creates multiple agent completions in batch asynchronously.
import asyncio
from swarms_client import SwarmsClient
async def main():
async with SwarmsClient(api_key="your-api-key") as client:
responses = await client.agent.acreate_batch([
{
"agent_config": {
"agent_name": "Researcher",
"model_name": "gpt-4o-mini"
},
"task": "Summarize the latest quantum computing research"
},
{
"agent_config": {
"agent_name": "Writer",
"model_name": "gpt-4o"
},
"task": "Write a blog post about AI safety"
}
])
for i, response in enumerate(responses):
print(f"Agent {i+1} ID: {response.id}")
print(f"Output: {response.outputs}")
print("---")
asyncio.run(main())
Parameters¶
Same as the create_batch
method.
Returns¶
Same as the create_batch
method.
Swarm Resource¶
The Swarm resource provides methods for creating and managing swarm completions.
create¶
Creates a swarm completion.
response = client.swarm.create(
name="Research Swarm",
description="A swarm for research tasks",
swarm_type="SequentialWorkflow",
task="Research quantum computing advances in 2024 and summarize the key findings",
agents=[
{
"agent_name": "Researcher",
"description": "Conducts in-depth research",
"model_name": "gpt-4o",
"temperature": 0.5
},
{
"agent_name": "Critic",
"description": "Evaluates arguments for flaws",
"model_name": "gpt-4o-mini",
"temperature": 0.3
}
],
max_loops=3,
return_history=True
)
print(f"Job ID: {response.job_id}")
print(f"Status: {response.status}")
print(f"Output: {response.output}")
Parameters¶
Parameter | Type | Required | Description |
---|---|---|---|
name |
str |
No | Name of the swarm |
description |
str |
No | Description of the swarm |
agents |
List[Dict or AgentSpec] |
No | List of agent specifications |
max_loops |
int |
No | Maximum number of loops (default: 1) |
swarm_type |
str |
No | Type of swarm (see available types) |
task |
str |
Conditional | The task to complete (required if tasks and messages are not provided) |
tasks |
List[str] |
Conditional | List of tasks for batch processing (required if task and messages are not provided) |
messages |
List[Dict] |
Conditional | List of messages to process (required if task and tasks are not provided) |
return_history |
bool |
No | Whether to return the execution history (default: True) |
rules |
str |
No | Rules for the swarm |
schedule |
Dict |
No | Schedule specification for delayed execution |
stream |
bool |
No | Whether to stream the response (default: False) |
service_tier |
str |
No | Service tier ('standard' or 'flex', default: 'standard') |
Returns¶
SwarmCompletionResponse
object with the following properties:
job_id
: Unique identifier for the jobstatus
: Status of the jobswarm_name
: Name of the swarmdescription
: Description of the swarmswarm_type
: Type of swarm usedoutput
: Output from the swarmnumber_of_agents
: Number of agents in the swarmservice_tier
: Service tier usedtasks
: List of tasks processed (if applicable)messages
: List of messages processed (if applicable)
create_batch¶
Creates multiple swarm completions in batch.
responses = client.swarm.create_batch([
{
"name": "Research Swarm",
"swarm_type": "auto",
"task": "Research quantum computing advances",
"agents": [
{"agent_name": "Researcher", "model_name": "gpt-4o"}
]
},
{
"name": "Writing Swarm",
"swarm_type": "SequentialWorkflow",
"task": "Write a blog post about AI safety",
"agents": [
{"agent_name": "Writer", "model_name": "gpt-4o"},
{"agent_name": "Editor", "model_name": "gpt-4o-mini"}
]
}
])
for i, response in enumerate(responses):
print(f"Swarm {i+1} Job ID: {response.job_id}")
print(f"Status: {response.status}")
print(f"Output: {response.output}")
print("---")
Parameters¶
Parameter | Type | Required | Description |
---|---|---|---|
swarms |
List[Dict or SwarmSpec] |
Yes | List of swarm specifications |
Each item in the swarms
list should have the same structure as the parameters for the create
method.
Returns¶
List of SwarmCompletionResponse
objects with the same properties as the return value of the create
method.
list_types¶
Lists available swarm types.
response = client.swarm.list_types()
print(f"Available swarm types:")
for swarm_type in response.swarm_types:
print(f"- {swarm_type}")
Returns¶
SwarmTypesResponse
object with the following properties:
success
: Whether the request was successfulswarm_types
: List of available swarm types
alist_types¶
Lists available swarm types asynchronously.
import asyncio
from swarms_client import SwarmsClient
async def main():
async with SwarmsClient(api_key="your-api-key") as client:
response = await client.swarm.alist_types()
print(f"Available swarm types:")
for swarm_type in response.swarm_types:
print(f"- {swarm_type}")
asyncio.run(main())
Returns¶
Same as the list_types
method.
acreate¶
Creates a swarm completion asynchronously.
import asyncio
from swarms_client import SwarmsClient
async def main():
async with SwarmsClient(api_key="your-api-key") as client:
response = await client.swarm.acreate(
name="Research Swarm",
swarm_type="SequentialWorkflow",
task="Research quantum computing advances in 2024",
agents=[
{
"agent_name": "Researcher",
"description": "Conducts in-depth research",
"model_name": "gpt-4o"
},
{
"agent_name": "Critic",
"description": "Evaluates arguments for flaws",
"model_name": "gpt-4o-mini"
}
]
)
print(f"Job ID: {response.job_id}")
print(f"Status: {response.status}")
print(f"Output: {response.output}")
asyncio.run(main())
Parameters¶
Same as the create
method.
Returns¶
Same as the create
method.
acreate_batch¶
Creates multiple swarm completions in batch asynchronously.
import asyncio
from swarms_client import SwarmsClient
async def main():
async with SwarmsClient(api_key="your-api-key") as client:
responses = await client.swarm.acreate_batch([
{
"name": "Research Swarm",
"swarm_type": "auto",
"task": "Research quantum computing",
"agents": [
{"agent_name": "Researcher", "model_name": "gpt-4o"}
]
},
{
"name": "Writing Swarm",
"swarm_type": "SequentialWorkflow",
"task": "Write a blog post about AI safety",
"agents": [
{"agent_name": "Writer", "model_name": "gpt-4o"}
]
}
])
for i, response in enumerate(responses):
print(f"Swarm {i+1} Job ID: {response.job_id}")
print(f"Status: {response.status}")
print(f"Output: {response.output}")
print("---")
asyncio.run(main())
Parameters¶
Same as the create_batch
method.
Returns¶
Same as the create_batch
method.
Models Resource¶
The Models resource provides methods for retrieving information about available models.
list¶
Lists available models.
response = client.models.list()
print(f"Available models:")
for model in response.models:
print(f"- {model}")
Returns¶
ModelsResponse
object with the following properties:
success
: Whether the request was successfulmodels
: List of available model names
alist¶
Lists available models asynchronously.
import asyncio
from swarms_client import SwarmsClient
async def main():
async with SwarmsClient(api_key="your-api-key") as client:
response = await client.models.alist()
print(f"Available models:")
for model in response.models:
print(f"- {model}")
asyncio.run(main())
Returns¶
Same as the list
method.
Logs Resource¶
The Logs resource provides methods for retrieving API request logs.
list¶
Lists API request logs.
response = client.logs.list()
print(f"Found {response.count} logs:")
for log in response.logs:
print(f"- ID: {log.id}, Created at: {log.created_at}")
print(f" Data: {log.data}")
Returns¶
LogsResponse
object with the following properties:
status
: Status of the requestcount
: Number of logslogs
: List of log entriestimestamp
: Timestamp of the request
Each log entry is a LogEntry
object with the following properties:
id
: Unique identifier for the log entryapi_key
: API key used for the requestdata
: Request datacreated_at
: Timestamp when the log entry was created
alist¶
Lists API request logs asynchronously.
import asyncio
from swarms_client import SwarmsClient
async def main():
async with SwarmsClient() as client:
response = await client.logs.alist()
print(f"Found {response.count} logs:")
for log in response.logs:
print(f"- ID: {log.id}, Created at: {log.created_at}")
print(f" Data: {log.data}")
asyncio.run(main())
Returns¶
Same as the list
method.
Error Handling¶
The Swarms API client provides detailed error handling with specific exception types for different error scenarios. All exceptions inherit from the base SwarmsError
class.
from swarms_client import SwarmsClient, SwarmsError, AuthenticationError, RateLimitError, APIError
try:
client = SwarmsClient(api_key="invalid-api-key")
response = client.agent.create(
agent_config={"agent_name": "Researcher", "model_name": "gpt-4o"},
task="Research quantum computing"
)
except AuthenticationError as e:
print(f"Authentication error: {e}")
except RateLimitError as e:
print(f"Rate limit exceeded: {e}")
except APIError as e:
print(f"API error: {e}")
except SwarmsError as e:
print(f"Swarms error: {e}")
Exception Types¶
Exception | Description |
---|---|
SwarmsError |
Base exception for all Swarms API errors |
AuthenticationError |
Raised when there's an issue with authentication |
RateLimitError |
Raised when the rate limit is exceeded |
APIError |
Raised when the API returns an error |
InvalidRequestError |
Raised when the request is invalid |
InsufficientCreditsError |
Raised when the user doesn't have enough credits |
TimeoutError |
Raised when a request times out |
NetworkError |
Raised when there's a network issue |
Advanced Features¶
Connection Pooling¶
The Swarms API client uses connection pooling to efficiently manage HTTP connections, which can significantly improve performance when making multiple requests.
client = SwarmsClient(
api_key="your-api-key",
pool_connections=100, # Number of connection pools to cache
pool_maxsize=100, # Maximum number of connections to save in the pool
keep_alive_timeout=5 # Keep-alive timeout for connections in seconds
)
Circuit Breaker Pattern¶
The client implements the circuit breaker pattern to prevent cascading failures when the API is experiencing issues.
client = SwarmsClient(
api_key="your-api-key",
circuit_breaker_threshold=5, # Number of failures before the circuit opens
circuit_breaker_timeout=60 # Time in seconds before attempting to close the circuit
)
Caching¶
The client includes in-memory caching for frequently accessed resources to reduce API calls and improve performance.
client = SwarmsClient(
api_key="your-api-key",
enable_cache=True # Enable in-memory caching
)
# Clear the cache manually if needed
client.clear_cache()
Complete Example¶
Here's a complete example that demonstrates how to use the Swarms API client to create a research swarm and process its output:
import os
from swarms_client import SwarmsClient
from dotenv import load_dotenv
# Load API key from environment
load_dotenv()
api_key = os.getenv("SWARMS_API_KEY")
# Initialize client
client = SwarmsClient(api_key=api_key)
# Create a research swarm
try:
# Define the agents
researcher = {
"agent_name": "Researcher",
"description": "Conducts thorough research on specified topics",
"model_name": "gpt-4o",
"temperature": 0.5,
"system_prompt": "You are a diligent researcher focused on finding accurate and comprehensive information."
}
analyst = {
"agent_name": "Analyst",
"description": "Analyzes research findings and identifies key insights",
"model_name": "gpt-4o",
"temperature": 0.3,
"system_prompt": "You are an insightful analyst who can identify patterns and extract meaningful insights from research data."
}
summarizer = {
"agent_name": "Summarizer",
"description": "Creates concise summaries of complex information",
"model_name": "gpt-4o-mini",
"temperature": 0.4,
"system_prompt": "You specialize in distilling complex information into clear, concise summaries."
}
# Create the swarm
response = client.swarm.create(
name="Quantum Computing Research Swarm",
description="A swarm for researching and analyzing quantum computing advancements",
swarm_type="SequentialWorkflow",
task="Research the latest advancements in quantum computing in 2024, analyze their potential impact on cryptography and data security, and provide a concise summary of the findings.",
agents=[researcher, analyst, summarizer],
max_loops=2,
return_history=True
)
# Process the response
print(f"Job ID: {response.job_id}")
print(f"Status: {response.status}")
print(f"Number of agents: {response.number_of_agents}")
print(f"Swarm type: {response.swarm_type}")
# Print the output
if "final_output" in response.output:
print("\nFinal Output:")
print(response.output["final_output"])
else:
print("\nOutput:")
print(response.output)
# Access agent-specific outputs if available
if "agent_outputs" in response.output:
print("\nAgent Outputs:")
for agent, output in response.output["agent_outputs"].items():
print(f"\n{agent}:")
print(output)
except Exception as e:
print(f"Error: {e}")
This example creates a sequential workflow swarm with three agents to research quantum computing, analyze the findings, and create a summary of the results.