Swarms Cloud API Client Documentation¶
Overview¶
The Swarms Cloud API Client is a production-grade Python library for interacting with the Swarms Cloud Agent API. It provides a comprehensive interface for managing, executing, and monitoring cloud-based agents.
Installation¶
Quick Start¶
from swarms_cloud import SwarmCloudAPI, AgentCreate
# Initialize the client
client = SwarmCloudAPI(
base_url="https://swarmcloud-285321057562.us-central1.run.app",
api_key="your_api_key_here"
)
# Create an agent
agent_data = AgentCreate(
name="TranslateAgent",
description="Translates text between languages",
code="""
def main(request, store):
text = request.payload.get('text', '')
return f'Translated: {text}'
""",
requirements="requests==2.25.1",
envs="DEBUG=True"
)
new_agent = client.create_agent(agent_data)
print(f"Created agent with ID: {new_agent.id}")
Client Configuration¶
Constructor Parameters¶
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
base_url | str | No | https://swarmcloud-285321057562.us-central1.run.app | The base URL of the SwarmCloud API |
api_key | str | Yes | None | Your SwarmCloud API key |
timeout | float | No | 10.0 | Request timeout in seconds |
Data Models¶
AgentCreate¶
Model for creating new agents.
Field | Type | Required | Default | Description |
---|---|---|---|---|
name | str | Yes | - | Name of the agent |
description | str | No | None | Description of the agent's purpose |
code | str | Yes | - | Python code that defines the agent's behavior |
requirements | str | No | None | Python package requirements (pip format) |
envs | str | No | None | Environment variables for the agent |
autoscaling | bool | No | False | Enable/disable concurrent execution scaling |
AgentUpdate¶
Model for updating existing agents.
Field | Type | Required | Default | Description |
---|---|---|---|---|
name | str | No | None | Updated name of the agent |
description | str | No | None | Updated description |
code | str | No | None | Updated Python code |
requirements | str | No | None | Updated package requirements |
autoscaling | bool | No | None | Updated autoscaling setting |
API Methods¶
List Agents¶
Retrieve all available agents.
Returns: List[AgentOut]
Create Agent¶
Create a new agent with the specified configuration.
agent_data = AgentCreate(
name="DataProcessor",
description="Processes incoming data streams",
code="""
def main(request, store):
data = request.payload.get('data', [])
return {'processed': len(data)}
""",
requirements="pandas==1.4.0\nnumpy==1.21.0",
envs="PROCESSING_MODE=fast",
autoscaling=True
)
new_agent = client.create_agent(agent_data)
Returns: AgentOut
Get Agent¶
Retrieve details of a specific agent.
Parameters: - agent_id (str): The unique identifier of the agent
Returns: AgentOut
Update Agent¶
Update an existing agent's configuration.
update_data = AgentUpdate(
name="UpdatedProcessor",
description="Enhanced data processing capabilities",
code="def main(request, store):\n return {'status': 'updated'}"
)
updated_agent = client.update_agent("agent_id_here", update_data)
Parameters: - agent_id (str): The unique identifier of the agent - update (AgentUpdate): The update data
Returns: AgentOut
Execute Agent¶
Manually execute an agent with optional payload data.
# Execute with payload
result = client.execute_agent(
"agent_id_here",
payload={"text": "Hello, World!"}
)
# Execute without payload
result = client.execute_agent("agent_id_here")
Parameters: - agent_id (str): The unique identifier of the agent - payload (Optional[Dict[str, Any]]): Execution payload data
Returns: Dict[str, Any]
Get Agent History¶
Retrieve the execution history and logs for an agent.
history = client.get_agent_history("agent_id_here")
for execution in history.executions:
print(f"[{execution.timestamp}] {execution.log}")
Parameters: - agent_id (str): The unique identifier of the agent
Returns: AgentExecutionHistory
Batch Execute Agents¶
Execute multiple agents simultaneously with the same payload.
# Get list of agents
agents = client.list_agents()
# Execute batch with payload
results = client.batch_execute_agents(
agents=agents[:3], # Execute first three agents
payload={"data": "test"}
)
print(f"Batch execution results: {results}")
Parameters: - agents (List[AgentOut]): List of agents to execute - payload (Optional[Dict[str, Any]]): Shared execution payload
Returns: List[Any]
Health Check¶
Check the API's health status.
Returns: Dict[str, Any]
Error Handling¶
The client uses exception handling to manage various error scenarios:
from swarms_cloud import SwarmCloudAPI
import httpx
try:
client = SwarmCloudAPI(api_key="your_api_key_here")
agents = client.list_agents()
except httpx.HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
except Exception as err:
print(f"An unexpected error occurred: {err}")
finally:
client.close()
Context Manager Support¶
The client can be used with Python's context manager:
with SwarmCloudAPI(api_key="your_api_key_here") as client:
status = client.health()
print(f"API Status: {status}")
# Client automatically closes after the with block
Best Practices¶
-
Always close the client when finished:
-
Use context managers for automatic cleanup:
-
Handle errors appropriately:
-
Set appropriate timeouts for your use case:
Complete Example¶
Here's a complete example showcasing various features of the client:
from swarms_cloud import SwarmCloudAPI, AgentCreate, AgentUpdate
import httpx
def main():
with SwarmCloudAPI(api_key="your_api_key_here") as client:
# Create an agent
agent_data = AgentCreate(
name="DataAnalyzer",
description="Analyzes incoming data streams",
code="""
def main(request, store):
data = request.payload.get('data', [])
return {
'count': len(data),
'summary': 'Data processed successfully'
}
""",
requirements="pandas==1.4.0",
autoscaling=True
)
try:
# Create the agent
new_agent = client.create_agent(agent_data)
print(f"Created agent: {new_agent.name} (ID: {new_agent.id})")
# Execute the agent
result = client.execute_agent(
new_agent.id,
payload={"data": [1, 2, 3, 4, 5]}
)
print(f"Execution result: {result}")
# Update the agent
update_data = AgentUpdate(
description="Enhanced data analysis capabilities"
)
updated_agent = client.update_agent(new_agent.id, update_data)
print(f"Updated agent: {updated_agent.name}")
# Get execution history
history = client.get_agent_history(new_agent.id)
print(f"Execution history: {history}")
except httpx.HTTPError as e:
print(f"HTTP error occurred: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
if __name__ == "__main__":
main()
Logging¶
The client uses the loguru
library for logging. You can configure the logging level and format:
from loguru import logger
# Configure logging
logger.add("swarmcloud.log", rotation="500 MB")
client = SwarmCloudAPI(api_key="your_api_key_here")
Performance Considerations¶
-
Connection Reuse: The client reuses HTTP connections by default, improving performance for multiple requests.
-
Timeout Configuration: Set appropriate timeouts based on your use case:
-
Batch Operations: Use batch_execute_agents for multiple agent executions:
Rate Limiting¶
The client respects API rate limits but does not implement retry logic. Implement your own retry mechanism if needed:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def execute_with_retry(client, agent_id, payload):
return client.execute_agent(agent_id, payload)
Thread Safety¶
The client is not thread-safe by default. For concurrent usage, create separate client instances for each thread or implement appropriate synchronization mechanisms.