Build AI agents that can take real-world actions by integrating with external services like Slack, JIRA, GitHub, and deployment systems through Hyrex task orchestration.

Overview

This example demonstrates how to create actionable AI agents:
  • Multi-service integration - Slack notifications, JIRA tickets, deployments
  • Action routing - Dispatch different actions based on agent requests
  • Bulk operations - Process multiple agent actions efficiently
  • Security & validation - Ensure agents can only perform authorized actions
  • Audit logging - Track all agent-initiated actions
Perfect for building AI assistants, automated workflows, or agent-driven DevOps systems.

Task Definitions

from hyrex import HyrexRegistry
import requests
import json
import os

hy = HyrexRegistry()

@hy.task
def send_slack_notification(message: str, channel: str):
    """Send notification to Slack channel"""
    payload = {
        "text": message,
        "channel": channel
    }
    
    response = requests.post(
        os.environ.get("SLACK_WEBHOOK_URL"),
        json=payload
    )
    
    if response.status_code != 200:
        raise Exception(f"Slack notification failed: {response.text}")
    
    return {"status": "sent", "channel": channel, "message_length": len(message)}

@hy.task
def create_jira_ticket(title: str, description: str, assignee: str, priority: str = "Medium"):
    """Create a new JIRA ticket"""
    ticket_data = {
        "fields": {
            "project": {"key": os.environ.get("JIRA_PROJECT_KEY", "PROJ")},
            "summary": title,
            "description": {
                "content": [
                    {
                        "content": [{"text": description, "type": "text"}],
                        "type": "paragraph"
                    }
                ],
                "type": "doc",
                "version": 1
            },
            "issuetype": {"name": "Task"},
            "assignee": {"displayName": assignee},
            "priority": {"name": priority}
        }
    }
    
    response = requests.post(
        f"{os.environ.get('JIRA_BASE_URL')}/rest/api/3/issue",
        json=ticket_data,
        auth=(os.environ.get("JIRA_EMAIL"), os.environ.get("JIRA_TOKEN"))
    )
    
    if response.status_code not in [200, 201]:
        raise Exception(f"JIRA ticket creation failed: {response.text}")
    
    ticket = response.json()
    return {
        "ticket_id": ticket["key"],
        "ticket_url": f"{os.environ.get('JIRA_BASE_URL')}/browse/{ticket['key']}",
        "title": title,
        "assignee": assignee
    }

@hy.task
def create_github_issue(repo: str, title: str, body: str, labels: list = None):
    """Create GitHub issue"""
    issue_data = {
        "title": title,
        "body": body,
        "labels": labels or []
    }
    
    response = requests.post(
        f"https://api.github.com/repos/{repo}/issues",
        json=issue_data,
        headers={
            "Authorization": f"token {os.environ.get('GITHUB_TOKEN')}",
            "Accept": "application/vnd.github.v3+json"
        }
    )
    
    if response.status_code != 201:
        raise Exception(f"GitHub issue creation failed: {response.text}")
    
    issue = response.json()
    return {
        "issue_id": issue["number"],
        "issue_url": issue["html_url"],
        "title": title,
        "repo": repo
    }

@hy.task
def deploy_service(service_name: str, environment: str = "production", version: str = "latest"):
    """Deploy service to specified environment"""
    # Integration with your deployment system (K8s, Docker, etc.)
    deployment_config = {
        "service": service_name,
        "environment": environment,
        "version": version,
        "triggered_by": "ai_agent"
    }
    
    # Example: Trigger deployment pipeline
    response = requests.post(
        f"{os.environ.get('DEPLOYMENT_API_URL')}/deploy",
        json=deployment_config,
        headers={"Authorization": f"Bearer {os.environ.get('DEPLOYMENT_TOKEN')}"}
    )
    
    if response.status_code != 200:
        raise Exception(f"Deployment failed: {response.text}")
    
    deployment = response.json()
    return {
        "deployment_id": deployment.get("id"),
        "service": service_name,
        "environment": environment,
        "status": "initiated"
    }

@hy.task
def send_email_alert(recipients: list, subject: str, body: str, priority: str = "normal"):
    """Send email alert to specified recipients"""
    email_data = {
        "recipients": recipients,
        "subject": subject,
        "body": body,
        "priority": priority
    }
    
    # Integration with your email service
    response = requests.post(
        f"{os.environ.get('EMAIL_SERVICE_URL')}/send",
        json=email_data,
        headers={"Authorization": f"Bearer {os.environ.get('EMAIL_SERVICE_TOKEN')}"}
    )
    
    if response.status_code != 200:
        raise Exception(f"Email sending failed: {response.text}")
    
    return {
        "recipients": recipients,
        "subject": subject,
        "status": "sent"
    }

@hy.task
def log_agent_action(agent_id: str, action_type: str, payload: dict, result: dict = None):
    """Log agent action for audit purposes"""
    log_entry = {
        "timestamp": datetime.now().isoformat(),
        "agent_id": agent_id,
        "action_type": action_type,
        "payload": payload,
        "result": result,
        "status": "success" if result else "attempted"
    }
    
    # Store in your logging system
    save_to_audit_log(log_entry)
    return log_entry

@hy.task
def process_ai_agent_action(action_type: str, payload: dict, agent_id: str = "unknown"):
    """Process action requested by AI agent"""
    result = None
    
    try:
        if action_type == "notify_slack":
            result = send_slack_notification.send(
                payload["message"], 
                payload["channel"]
            ).get()
            
        elif action_type == "create_jira_ticket":
            result = create_jira_ticket.send(
                payload["title"],
                payload["description"],
                payload["assignee"],
                payload.get("priority", "Medium")
            ).get()
            
        elif action_type == "create_github_issue":
            result = create_github_issue.send(
                payload["repo"],
                payload["title"],
                payload["body"],
                payload.get("labels", [])
            ).get()
            
        elif action_type == "deploy_service":
            result = deploy_service.send(
                payload["service_name"],
                payload.get("environment", "production"),
                payload.get("version", "latest")
            ).get()
            
        elif action_type == "send_email":
            result = send_email_alert.send(
                payload["recipients"],
                payload["subject"],
                payload["body"],
                payload.get("priority", "normal")
            ).get()
            
        else:
            raise ValueError(f"Unsupported action type: {action_type}")
        
        # Log successful action
        log_agent_action.send(agent_id, action_type, payload, result)
        
        return {
            "success": True,
            "action_type": action_type,
            "result": result,
            "agent_id": agent_id
        }
        
    except Exception as e:
        # Log failed action
        log_agent_action.send(agent_id, action_type, payload, {"error": str(e)})
        raise e

REST API Endpoints

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from .tasks import process_ai_agent_action

app = FastAPI()

class AgentActionRequest(BaseModel):
    action_type: str
    payload: dict
    agent_id: Optional[str] = None
    
class BulkActionRequest(BaseModel):
    actions: List[AgentActionRequest]

@app.post("/agent/action")
async def execute_agent_action(request: AgentActionRequest):
    """Execute a single agent action"""
    try:
        task = process_ai_agent_action.send(
            request.action_type, 
            request.payload,
            request.agent_id or "unknown"
        )
        
        return {
            "message": "Agent action queued successfully", 
            "task_id": task.task_id,
            "action_type": request.action_type,
            "agent_id": request.agent_id
        }
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/agent/bulk-actions") 
async def execute_bulk_actions(request: BulkActionRequest):
    """Execute multiple agent actions"""
    task_ids = []
    
    for action in request.actions:
        try:
            task = process_ai_agent_action.send(
                action.action_type,
                action.payload,
                action.agent_id or "unknown"
            )
            task_ids.append({
                "task_id": task.task_id,
                "action_type": action.action_type,
                "agent_id": action.agent_id
            })
        except Exception as e:
            task_ids.append({
                "error": str(e),
                "action_type": action.action_type,
                "agent_id": action.agent_id
            })
    
    return {
        "message": f"Queued {len([t for t in task_ids if 'task_id' in t])} actions",
        "results": task_ids
    }

@app.get("/agent/actions/{task_id}/status")
async def get_action_status(task_id: str):
    """Get status of an agent action"""
    task = hy.get_task(task_id)
    
    return {
        "task_id": task_id,
        "status": task.status,
        "result": task.result if task.is_complete else None,
        "error": task.error if task.has_failed else None,
        "created_at": task.created_at,
        "completed_at": task.completed_at
    }

@app.get("/agent/{agent_id}/actions")
async def get_agent_actions(agent_id: str):
    """Get all actions for a specific agent"""
    # Retrieve from audit log
    actions = get_agent_actions_from_log(agent_id)
    return {
        "agent_id": agent_id,
        "actions": actions,
        "total_actions": len(actions)
    }

Usage Examples

Single Agent Action

# Send Slack notification
curl -X POST http://localhost:8000/agent/action \
  -H "Content-Type: application/json" \
  -d '{
    "action_type": "notify_slack",
    "payload": {
      "message": "Deployment completed successfully!",
      "channel": "#deployments"
    },
    "agent_id": "deploy_bot_v1"
  }'

# Create JIRA ticket
curl -X POST http://localhost:8000/agent/action \
  -H "Content-Type: application/json" \
  -d '{
    "action_type": "create_jira_ticket",
    "payload": {
      "title": "Investigate high memory usage",
      "description": "Memory usage spiked to 95% at 2:30 PM",
      "assignee": "john.doe",
      "priority": "High"
    },
    "agent_id": "monitoring_agent"
  }'

# Deploy service
curl -X POST http://localhost:8000/agent/action \
  -H "Content-Type: application/json" \
  -d '{
    "action_type": "deploy_service",
    "payload": {
      "service_name": "user-service",
      "environment": "production",
      "version": "v1.2.3"
    },
    "agent_id": "deploy_agent"
  }'

Bulk Actions

curl -X POST http://localhost:8000/agent/bulk-actions \
  -H "Content-Type: application/json" \
  -d '{
    "actions": [
      {
        "action_type": "notify_slack",
        "payload": {
          "message": "Starting deployment sequence",
          "channel": "#deployments"
        }
      },
      {
        "action_type": "deploy_service",
        "payload": {
          "service_name": "api-gateway",
          "environment": "production"
        }
      },
      {
        "action_type": "send_email",
        "payload": {
          "recipients": ["team@company.com"],
          "subject": "Production Deployment Started",
          "body": "The production deployment has been initiated by the AI agent."
        }
      }
    ]
  }'

Advanced Agent Patterns

Conditional Actions

@hy.task
def process_conditional_action(condition: dict, actions: list):
    """Execute actions based on conditions"""
    if evaluate_condition(condition):
        for action in actions:
            process_ai_agent_action.send(
                action['type'], 
                action['payload'], 
                action.get('agent_id')
            )
        return {"executed": len(actions), "condition_met": True}
    else:
        return {"executed": 0, "condition_met": False}

Action Chains

@hy.task
def execute_action_chain(actions: list, agent_id: str):
    """Execute actions in sequence with dependency handling"""
    results = []
    
    for action in actions:
        try:
            result = process_ai_agent_action.send(
                action['type'], 
                action['payload'], 
                agent_id
            ).get()
            
            results.append(result)
            
            # Check if we should continue based on result
            if action.get('stop_on_failure') and not result['success']:
                break
                
        except Exception as e:
            results.append({"error": str(e), "action": action['type']})
            if action.get('stop_on_failure', True):
                break
    
    return {
        "chain_results": results,
        "completed_actions": len(results),
        "total_actions": len(actions)
    }

Permission-Based Actions

@hy.task
def validate_agent_permissions(agent_id: str, action_type: str):
    """Validate if agent has permission for action"""
    agent_permissions = get_agent_permissions(agent_id)
    
    if action_type not in agent_permissions:
        raise PermissionError(f"Agent {agent_id} not authorized for {action_type}")
    
    return True

@hy.task
def secure_process_action(action_type: str, payload: dict, agent_id: str):
    """Process action with permission validation"""
    # Validate permissions first
    validate_agent_permissions.send(agent_id, action_type).get()
    
    # Execute the action
    return process_ai_agent_action.send(action_type, payload, agent_id).get()

Security & Compliance

Environment Variables

# Slack Integration
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL

# JIRA Integration  
JIRA_BASE_URL=https://your-domain.atlassian.net
JIRA_EMAIL=your-email@company.com
JIRA_TOKEN=your-api-token
JIRA_PROJECT_KEY=PROJ

# GitHub Integration
GITHUB_TOKEN=ghp_your-token

# Deployment Integration
DEPLOYMENT_API_URL=https://your-deployment-api.com
DEPLOYMENT_TOKEN=your-deployment-token

# Email Service
EMAIL_SERVICE_URL=https://your-email-service.com
EMAIL_SERVICE_TOKEN=your-email-token

Action Validation

ACTION_SCHEMAS = {
    "notify_slack": {
        "required": ["message", "channel"],
        "properties": {
            "message": {"type": "string", "maxLength": 4000},
            "channel": {"type": "string", "pattern": "^#[a-z0-9_-]+$"}
        }
    },
    "create_jira_ticket": {
        "required": ["title", "description", "assignee"],
        "properties": {
            "title": {"type": "string", "maxLength": 255},
            "description": {"type": "string", "maxLength": 32767},
            "assignee": {"type": "string"},
            "priority": {"enum": ["Lowest", "Low", "Medium", "High", "Highest"]}
        }
    }
}

Production Considerations

  • Rate limiting: Prevent agents from overwhelming external services
  • Permission management: Implement role-based access control for agents
  • Audit logging: Track all agent actions for compliance and debugging
  • Error handling: Graceful failure handling with proper notifications
  • Service limits: Respect external API rate limits and quotas
  • Security: Secure storage of API keys and authentication tokens

Next Steps