[ PROMPT_NODE_22195 ]
Agents Autogpt – Advanced Usage
[ SKILL_DOCUMENTATION ]
# AutoGPT Advanced Usage Guide
## Custom Block Development
### Block structure
```python
from backend.data.block import Block, BlockSchema, BlockType
from pydantic import BaseModel
class MyBlockInput(BaseModel):
"""Input schema for the block."""
query: str
max_results: int = 10
class MyBlockOutput(BaseModel):
"""Output schema for the block."""
results: list[str]
count: int
class MyCustomBlock(Block):
"""Custom block for specific functionality."""
id = "my-custom-block-uuid"
name = "My Custom Block"
description = "Does something specific"
block_type = BlockType.STANDARD
input_schema = MyBlockInput
output_schema = MyBlockOutput
async def execute(self, input_data: MyBlockInput) -> dict:
"""Execute the block logic."""
# Implement your logic
results = await self.process(input_data.query, input_data.max_results)
yield "results", results
yield "count", len(results)
async def process(self, query: str, max_results: int) -> list[str]:
"""Internal processing logic."""
# Implementation
return ["result1", "result2"]
```
### Block registration
```python
# backend/blocks/__init__.py
from backend.blocks.my_block import MyCustomBlock
# Add to block registry
BLOCKS = [
MyCustomBlock,
# ... other blocks
]
```
### Block with credentials
```python
from backend.data.block import Block
from backend.integrations.providers import ProviderName
class APIIntegrationBlock(Block):
"""Block that uses external API credentials."""
credentials_required = [ProviderName.OPENAI]
async def execute(self, input_data):
# Get credentials from the system
credentials = await self.get_credentials(ProviderName.OPENAI)
# Use credentials
client = OpenAI(api_key=credentials.api_key)
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": input_data.prompt}]
)
yield "response", response.choices[0].message.content
```
### Block with cost tracking
```python
from backend.data.block import Block
from backend.data.block_cost_config import BlockCostConfig
class LLMBlock(Block):
"""Block with cost tracking."""
cost_config = BlockCostConfig(
cost_type="token",
cost_per_unit=0.00002, # Per token
provider="openai"
)
async def execute(self, input_data):
response = await self.call_llm(input_data.prompt)
# Report token usage for cost tracking
self.report_usage(
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens
)
yield "output", response.content
```
## Advanced Execution Patterns
### Parallel node execution
```python
from backend.executor.manager import ExecutionManager
async def execute_parallel_nodes(graph_exec_id: str, node_ids: list[str]):
"""Execute multiple nodes in parallel."""
manager = ExecutionManager()
tasks = [
manager.execute_node(graph_exec_id, node_id)
for node_id in node_ids
]
results = await asyncio.gather(*tasks)
return results
```
### Conditional branching
```python
from backend.blocks.branching import BranchingBlock
class SmartBranchBlock(BranchingBlock):
"""Advanced conditional branching."""
async def execute(self, input_data):
condition = await self.evaluate_condition(input_data)
if condition == "path_a":
yield "output_a", input_data.value
elif condition == "path_b":
yield "output_b", input_data.value
else:
yield "output_default", input_data.value
```
### Loop execution
```python
class LoopBlock(Block):
"""Execute a subgraph in a loop."""
async def execute(self, input_data):
items = input_data.items
results = []
for i, item in enumerate(items):
# Execute nested graph for each item
result = await self.execute_subgraph(
graph_id=input_data.subgraph_id,
inputs={"item": item, "index": i}
)
results.append(result)
yield "progress", f"Processed {i+1}/{len(items)}"
yield "results", results
```
## Graph composition
### Nested agents
```python
from backend.blocks.agent import AgentExecutorBlock
class ParentAgentBlock(Block):
"""Execute child agents within a parent agent."""
async def execute(self, input_data):
# Execute child agent
child_result = await self.execute_agent(
agent_id=input_data.child_agent_id,
inputs={"query": input_data.query}
)
# Process child result
processed = await self.process_result(child_result)
yield "output", processed
```
### Dynamic graph construction
```python
from backend.data.graph import GraphModel, NodeModel, LinkModel
async def create_dynamic_graph(user_id: str, template: str):
"""Create a graph dynamically based on template."""
graph = GraphModel(
name=f"Dynamic Graph - {template}",
description="Auto-generated graph",
user_id=user_id
)
# Add nodes based on template
nodes = []
if template == "research":
nodes = [
NodeModel(block_id="search-block", position={"x": 0, "y": 0}),
NodeModel(block_id="summarize-block", position={"x": 200, "y": 0}),
NodeModel(block_id="output-block", position={"x": 400, "y": 0})
]
elif template == "code-review":
nodes = [
NodeModel(block_id="github-block", position={"x": 0, "y": 0}),
NodeModel(block_id="review-block", position={"x": 200, "y": 0}),
NodeModel(block_id="comment-block", position={"x": 400, "y": 0})
]
graph.nodes = nodes
# Create links between nodes
for i in range(len(nodes) - 1):
graph.links.append(LinkModel(
source_id=nodes[i].id,
sink_id=nodes[i+1].id,
source_name="output",
sink_name="input"
))
return await graph.save()
```
## Production deployment
### Kubernetes deployment
```yaml
# autogpt-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: autogpt-backend
spec:
replicas: 3
selector:
matchLabels:
app: autogpt-backend
template:
metadata:
labels:
app: autogpt-backend
spec:
containers:
- name: rest-server
image: autogpt/platform-backend:latest
command: ["poetry", "run", "rest"]
ports:
- containerPort: 8006
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: autogpt-secrets
key: database-url
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: autogpt-executor
spec:
replicas: 5
selector:
matchLabels:
app: autogpt-executor
template:
spec:
containers:
- name: executor
image: autogpt/platform-backend:latest
command: ["poetry", "run", "executor"]
resources:
requests:
memory: "1Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "4000m"
```
### Horizontal scaling
```yaml
# autogpt-hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: autogpt-executor-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: autogpt-executor
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: External
external:
metric:
name: rabbitmq_queue_messages
selector:
matchLabels:
queue: graph-execution
target:
type: AverageValue
averageValue: 10
```
### Database optimization
```sql
-- Optimize for high-volume execution tracking
CREATE INDEX CONCURRENTLY idx_node_exec_graph_status
ON "AgentNodeExecution" ("graphExecutionId", "executionStatus");
CREATE INDEX CONCURRENTLY idx_graph_exec_user_status
ON "AgentGraphExecution" ("userId", "executionStatus", "createdAt" DESC);
-- Partition execution tables by date
CREATE TABLE "AgentGraphExecution_partitioned" (
LIKE "AgentGraphExecution" INCLUDING ALL
) PARTITION BY RANGE ("createdAt");
-- Create monthly partitions
CREATE TABLE "AgentGraphExecution_2024_01"
PARTITION OF "AgentGraphExecution_partitioned"
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
```
## Monitoring and observability
### Prometheus metrics
```python
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
EXECUTIONS_TOTAL = Counter(
'autogpt_executions_total',
'Total graph executions',
['graph_id', 'status']
)
EXECUTION_DURATION = Histogram(
'autogpt_execution_duration_seconds',
'Execution duration in seconds',
['graph_id'],
buckets=[0.1, 0.5, 1, 5, 10, 30, 60, 120]
)
ACTIVE_EXECUTIONS = Gauge(
'autogpt_active_executions',
'Currently running executions'
)
# Use in executor
class ExecutionManager:
async def execute_graph(self, graph_id, inputs):
ACTIVE_EXECUTIONS.inc()
start_time = time.time()
try:
result = await self._execute(graph_id, inputs)
EXECUTIONS_TOTAL.labels(graph_id=graph_id, status='success').inc()
return result
except Exception as e:
EXECUTIONS_TOTAL.labels(graph_id=graph_id, status='failed').inc()
raise
finally:
ACTIVE_EXECUTIONS.dec()
EXECUTION_DURATION.labels(graph_id=graph_id).observe(
time.time() - start_time
)
```
### Grafana dashboard
```json
{
"dashboard": {
"title": "AutoGPT Platform",
"panels": [
{
"title": "Executions per Minute",
"type": "graph",
"targets": [
{
"expr": "rate(autogpt_executions_total[1m])",
"legendFormat": "{{status}}"
}
]
},
{
"title": "Execution Latency (p95)",
"type": "gauge",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(autogpt_execution_duration_seconds_bucket[5m]))"
}
]
},
{
"title": "Active Executions",
"type": "stat",
"targets": [
{"expr": "autogpt_active_executions"}
]
}
]
}
}
```
### Sentry error tracking
```python
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.asyncio import AsyncioIntegration
sentry_sdk.init(
dsn=os.environ.get("SENTRY_DSN"),
integrations=[
FastApiIntegration(),
AsyncioIntegration(),
],
traces_sample_rate=0.1,
profiles_sample_rate=0.1,
environment=os.environ.get("APP_ENV", "development")
)
# Custom error context
with sentry_sdk.push_scope() as scope:
scope.set_tag("graph_id", graph_id)
scope.set_extra("inputs", sanitized_inputs)
sentry_sdk.capture_exception(error)
```
## API integration patterns
### Webhook handling
```python
from fastapi import APIRouter, Request
from backend.data.webhook import WebhookHandler
router = APIRouter()
@router.post("/webhooks/{webhook_id}")
async def handle_webhook(webhook_id: str, request: Request):
"""Handle incoming webhook."""
handler = WebhookHandler()
# Verify webhook signature
signature = request.headers.get("X-Webhook-Signature")
if not await handler.verify_signature(webhook_id, signature, await request.body()):
return {"error": "Invalid signature"}, 401
# Parse payload
payload = await request.json()
# Trigger associated graph
execution = await handler.trigger_graph(webhook_id, payload)
return {
"execution_id": execution.id,
"status": "queued"
}
```
### External API rate limiting
```python
from asyncio import Semaphore
from functools import wraps
class RateLimiter:
"""Rate limiter for external API calls."""
def __init__(self, max_concurrent: int = 10, rate_per_second: float = 5):
self.semaphore = Semaphore(max_concurrent)
self.rate = rate_per_second
self.last_call = 0
async def acquire(self):
await self.semaphore.acquire()
now = time.time()
wait_time = max(0, (1 / self.rate) - (now - self.last_call))
if wait_time > 0:
await asyncio.sleep(wait_time)
self.last_call = time.time()
def release(self):
self.semaphore.release()
# Usage in block
class RateLimitedAPIBlock(Block):
rate_limiter = RateLimiter(max_concurrent=5, rate_per_second=2)
async def execute(self, input_data):
await self.rate_limiter.acquire()
try:
result = await self.call_api(input_data)
yield "output", result
finally:
self.rate_limiter.release()
```
Source: claude-code-templates (MIT). See About Us for full credits.