Overview
This example demonstrates how to create actionable AI agents:- Multi-service integration - Slack notifications, JIRA tickets, deployments
- Action routing - Dispatch different actions based on agent requests
- Bulk operations - Process multiple agent actions efficiently
- Security & validation - Ensure agents can only perform authorized actions
- Audit logging - Track all agent-initiated actions
Task Definitions
Copy
Ask AI
from hyrex import HyrexRegistry
import requests
import json
import os
hy = HyrexRegistry()
@hy.task
def send_slack_notification(message: str, channel: str):
"""Send notification to Slack channel"""
payload = {
"text": message,
"channel": channel
}
response = requests.post(
os.environ.get("SLACK_WEBHOOK_URL"),
json=payload
)
if response.status_code != 200:
raise Exception(f"Slack notification failed: {response.text}")
return {"status": "sent", "channel": channel, "message_length": len(message)}
@hy.task
def create_jira_ticket(title: str, description: str, assignee: str, priority: str = "Medium"):
"""Create a new JIRA ticket"""
ticket_data = {
"fields": {
"project": {"key": os.environ.get("JIRA_PROJECT_KEY", "PROJ")},
"summary": title,
"description": {
"content": [
{
"content": [{"text": description, "type": "text"}],
"type": "paragraph"
}
],
"type": "doc",
"version": 1
},
"issuetype": {"name": "Task"},
"assignee": {"displayName": assignee},
"priority": {"name": priority}
}
}
response = requests.post(
f"{os.environ.get('JIRA_BASE_URL')}/rest/api/3/issue",
json=ticket_data,
auth=(os.environ.get("JIRA_EMAIL"), os.environ.get("JIRA_TOKEN"))
)
if response.status_code not in [200, 201]:
raise Exception(f"JIRA ticket creation failed: {response.text}")
ticket = response.json()
return {
"ticket_id": ticket["key"],
"ticket_url": f"{os.environ.get('JIRA_BASE_URL')}/browse/{ticket['key']}",
"title": title,
"assignee": assignee
}
@hy.task
def create_github_issue(repo: str, title: str, body: str, labels: list = None):
"""Create GitHub issue"""
issue_data = {
"title": title,
"body": body,
"labels": labels or []
}
response = requests.post(
f"https://api.github.com/repos/{repo}/issues",
json=issue_data,
headers={
"Authorization": f"token {os.environ.get('GITHUB_TOKEN')}",
"Accept": "application/vnd.github.v3+json"
}
)
if response.status_code != 201:
raise Exception(f"GitHub issue creation failed: {response.text}")
issue = response.json()
return {
"issue_id": issue["number"],
"issue_url": issue["html_url"],
"title": title,
"repo": repo
}
@hy.task
def deploy_service(service_name: str, environment: str = "production", version: str = "latest"):
"""Deploy service to specified environment"""
# Integration with your deployment system (K8s, Docker, etc.)
deployment_config = {
"service": service_name,
"environment": environment,
"version": version,
"triggered_by": "ai_agent"
}
# Example: Trigger deployment pipeline
response = requests.post(
f"{os.environ.get('DEPLOYMENT_API_URL')}/deploy",
json=deployment_config,
headers={"Authorization": f"Bearer {os.environ.get('DEPLOYMENT_TOKEN')}"}
)
if response.status_code != 200:
raise Exception(f"Deployment failed: {response.text}")
deployment = response.json()
return {
"deployment_id": deployment.get("id"),
"service": service_name,
"environment": environment,
"status": "initiated"
}
@hy.task
def send_email_alert(recipients: list, subject: str, body: str, priority: str = "normal"):
"""Send email alert to specified recipients"""
email_data = {
"recipients": recipients,
"subject": subject,
"body": body,
"priority": priority
}
# Integration with your email service
response = requests.post(
f"{os.environ.get('EMAIL_SERVICE_URL')}/send",
json=email_data,
headers={"Authorization": f"Bearer {os.environ.get('EMAIL_SERVICE_TOKEN')}"}
)
if response.status_code != 200:
raise Exception(f"Email sending failed: {response.text}")
return {
"recipients": recipients,
"subject": subject,
"status": "sent"
}
@hy.task
def log_agent_action(agent_id: str, action_type: str, payload: dict, result: dict = None):
"""Log agent action for audit purposes"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"action_type": action_type,
"payload": payload,
"result": result,
"status": "success" if result else "attempted"
}
# Store in your logging system
save_to_audit_log(log_entry)
return log_entry
@hy.task
def process_ai_agent_action(action_type: str, payload: dict, agent_id: str = "unknown"):
"""Process action requested by AI agent"""
result = None
try:
if action_type == "notify_slack":
result = send_slack_notification.send(
payload["message"],
payload["channel"]
).get()
elif action_type == "create_jira_ticket":
result = create_jira_ticket.send(
payload["title"],
payload["description"],
payload["assignee"],
payload.get("priority", "Medium")
).get()
elif action_type == "create_github_issue":
result = create_github_issue.send(
payload["repo"],
payload["title"],
payload["body"],
payload.get("labels", [])
).get()
elif action_type == "deploy_service":
result = deploy_service.send(
payload["service_name"],
payload.get("environment", "production"),
payload.get("version", "latest")
).get()
elif action_type == "send_email":
result = send_email_alert.send(
payload["recipients"],
payload["subject"],
payload["body"],
payload.get("priority", "normal")
).get()
else:
raise ValueError(f"Unsupported action type: {action_type}")
# Log successful action
log_agent_action.send(agent_id, action_type, payload, result)
return {
"success": True,
"action_type": action_type,
"result": result,
"agent_id": agent_id
}
except Exception as e:
# Log failed action
log_agent_action.send(agent_id, action_type, payload, {"error": str(e)})
raise e
REST API Endpoints
Copy
Ask AI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from .tasks import process_ai_agent_action
app = FastAPI()
class AgentActionRequest(BaseModel):
action_type: str
payload: dict
agent_id: Optional[str] = None
class BulkActionRequest(BaseModel):
actions: List[AgentActionRequest]
@app.post("/agent/action")
async def execute_agent_action(request: AgentActionRequest):
"""Execute a single agent action"""
try:
task = process_ai_agent_action.send(
request.action_type,
request.payload,
request.agent_id or "unknown"
)
return {
"message": "Agent action queued successfully",
"task_id": task.task_id,
"action_type": request.action_type,
"agent_id": request.agent_id
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/agent/bulk-actions")
async def execute_bulk_actions(request: BulkActionRequest):
"""Execute multiple agent actions"""
task_ids = []
for action in request.actions:
try:
task = process_ai_agent_action.send(
action.action_type,
action.payload,
action.agent_id or "unknown"
)
task_ids.append({
"task_id": task.task_id,
"action_type": action.action_type,
"agent_id": action.agent_id
})
except Exception as e:
task_ids.append({
"error": str(e),
"action_type": action.action_type,
"agent_id": action.agent_id
})
return {
"message": f"Queued {len([t for t in task_ids if 'task_id' in t])} actions",
"results": task_ids
}
@app.get("/agent/actions/{task_id}/status")
async def get_action_status(task_id: str):
"""Get status of an agent action"""
task = hy.get_task(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.is_complete else None,
"error": task.error if task.has_failed else None,
"created_at": task.created_at,
"completed_at": task.completed_at
}
@app.get("/agent/{agent_id}/actions")
async def get_agent_actions(agent_id: str):
"""Get all actions for a specific agent"""
# Retrieve from audit log
actions = get_agent_actions_from_log(agent_id)
return {
"agent_id": agent_id,
"actions": actions,
"total_actions": len(actions)
}
Usage Examples
Single Agent Action
Copy
Ask AI
# Send Slack notification
curl -X POST http://localhost:8000/agent/action \
-H "Content-Type: application/json" \
-d '{
"action_type": "notify_slack",
"payload": {
"message": "Deployment completed successfully!",
"channel": "#deployments"
},
"agent_id": "deploy_bot_v1"
}'
# Create JIRA ticket
curl -X POST http://localhost:8000/agent/action \
-H "Content-Type: application/json" \
-d '{
"action_type": "create_jira_ticket",
"payload": {
"title": "Investigate high memory usage",
"description": "Memory usage spiked to 95% at 2:30 PM",
"assignee": "john.doe",
"priority": "High"
},
"agent_id": "monitoring_agent"
}'
# Deploy service
curl -X POST http://localhost:8000/agent/action \
-H "Content-Type: application/json" \
-d '{
"action_type": "deploy_service",
"payload": {
"service_name": "user-service",
"environment": "production",
"version": "v1.2.3"
},
"agent_id": "deploy_agent"
}'
Bulk Actions
Copy
Ask AI
curl -X POST http://localhost:8000/agent/bulk-actions \
-H "Content-Type: application/json" \
-d '{
"actions": [
{
"action_type": "notify_slack",
"payload": {
"message": "Starting deployment sequence",
"channel": "#deployments"
}
},
{
"action_type": "deploy_service",
"payload": {
"service_name": "api-gateway",
"environment": "production"
}
},
{
"action_type": "send_email",
"payload": {
"recipients": ["team@company.com"],
"subject": "Production Deployment Started",
"body": "The production deployment has been initiated by the AI agent."
}
}
]
}'
Advanced Agent Patterns
Conditional Actions
Copy
Ask AI
@hy.task
def process_conditional_action(condition: dict, actions: list):
"""Execute actions based on conditions"""
if evaluate_condition(condition):
for action in actions:
process_ai_agent_action.send(
action['type'],
action['payload'],
action.get('agent_id')
)
return {"executed": len(actions), "condition_met": True}
else:
return {"executed": 0, "condition_met": False}
Action Chains
Copy
Ask AI
@hy.task
def execute_action_chain(actions: list, agent_id: str):
"""Execute actions in sequence with dependency handling"""
results = []
for action in actions:
try:
result = process_ai_agent_action.send(
action['type'],
action['payload'],
agent_id
).get()
results.append(result)
# Check if we should continue based on result
if action.get('stop_on_failure') and not result['success']:
break
except Exception as e:
results.append({"error": str(e), "action": action['type']})
if action.get('stop_on_failure', True):
break
return {
"chain_results": results,
"completed_actions": len(results),
"total_actions": len(actions)
}
Permission-Based Actions
Copy
Ask AI
@hy.task
def validate_agent_permissions(agent_id: str, action_type: str):
"""Validate if agent has permission for action"""
agent_permissions = get_agent_permissions(agent_id)
if action_type not in agent_permissions:
raise PermissionError(f"Agent {agent_id} not authorized for {action_type}")
return True
@hy.task
def secure_process_action(action_type: str, payload: dict, agent_id: str):
"""Process action with permission validation"""
# Validate permissions first
validate_agent_permissions.send(agent_id, action_type).get()
# Execute the action
return process_ai_agent_action.send(action_type, payload, agent_id).get()
Security & Compliance
Environment Variables
Copy
Ask AI
# Slack Integration
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL
# JIRA Integration
JIRA_BASE_URL=https://your-domain.atlassian.net
JIRA_EMAIL=your-email@company.com
JIRA_TOKEN=your-api-token
JIRA_PROJECT_KEY=PROJ
# GitHub Integration
GITHUB_TOKEN=ghp_your-token
# Deployment Integration
DEPLOYMENT_API_URL=https://your-deployment-api.com
DEPLOYMENT_TOKEN=your-deployment-token
# Email Service
EMAIL_SERVICE_URL=https://your-email-service.com
EMAIL_SERVICE_TOKEN=your-email-token
Action Validation
Copy
Ask AI
ACTION_SCHEMAS = {
"notify_slack": {
"required": ["message", "channel"],
"properties": {
"message": {"type": "string", "maxLength": 4000},
"channel": {"type": "string", "pattern": "^#[a-z0-9_-]+$"}
}
},
"create_jira_ticket": {
"required": ["title", "description", "assignee"],
"properties": {
"title": {"type": "string", "maxLength": 255},
"description": {"type": "string", "maxLength": 32767},
"assignee": {"type": "string"},
"priority": {"enum": ["Lowest", "Low", "Medium", "High", "Highest"]}
}
}
}
Production Considerations
- Rate limiting: Prevent agents from overwhelming external services
- Permission management: Implement role-based access control for agents
- Audit logging: Track all agent actions for compliance and debugging
- Error handling: Graceful failure handling with proper notifications
- Service limits: Respect external API rate limits and quotas
- Security: Secure storage of API keys and authentication tokens