Skip to main content

Documentation Index

Fetch the complete documentation index at: https://hyrex.io/docs/llms.txt

Use this file to discover all available pages before exploring further.

Create workflows that pause for human approval, enabling oversight for critical operations like expense processing, deployments, and sensitive data operations.

Overview

This example demonstrates human-in-the-loop patterns:
  • Approval workflows with email notifications and web interfaces
  • Timeout handling with escalation mechanisms
  • Multi-step processes that pause for human decisions
  • Expense processing with manager approval requirements
  • Deployment gates requiring human sign-off
  • Audit trails for all approval decisions
Perfect for compliance-driven processes, financial operations, or any workflow requiring human oversight.

Task Definitions

from hyrex import HyrexRegistry
import time
from enum import Enum
from datetime import datetime, timedelta

hy = HyrexRegistry()

class ApprovalStatus(Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"

@hy.task
def send_approval_notification(approver_email: str, task_data: dict, approval_id: str):
    """Send email notification to human approver"""
    email_content = f"""
    New task requires your approval:
    
    Task: {task_data['title']}
    Description: {task_data['description']}
    Requested by: {task_data['requester']}
    Amount: {task_data.get('amount', 'N/A')}
    
    Approve: https://yourapp.com/approvals/{approval_id}/approve
    Reject: https://yourapp.com/approvals/{approval_id}/reject
    
    This approval will timeout in {task_data.get('timeout_hours', 24)} hours.
    """
    
    send_email(
        to=approver_email,
        subject="Task Approval Required",
        body=email_content,
        priority="high"
    )
    
    return {"notification_sent": True, "approver": approver_email}

@hy.task
def wait_for_human_approval(approval_id: str, timeout_hours: int = 24) -> str:
    """Wait for human approval with timeout"""
    start_time = time.time()
    timeout_seconds = timeout_hours * 3600
    check_interval = 30  # seconds
    
    while time.time() - start_time < timeout_seconds:
        # Check approval status in database
        approval = get_approval_status(approval_id)
        
        if approval['status'] == ApprovalStatus.APPROVED.value:
            record_approval_completion(approval_id, "approved", approval.get('comment'))
            return "approved"
        elif approval['status'] == ApprovalStatus.REJECTED.value:
            record_approval_completion(approval_id, "rejected", approval.get('comment'))
            return "rejected"
        
        # Wait before checking again
        time.sleep(check_interval)
    
    # Timeout reached
    record_approval_completion(approval_id, "timeout", "Approval timeout exceeded")
    return "timeout"

@hy.task
def escalate_approval(approval_id: str, original_data: dict, escalation_level: int = 1):
    """Escalate approval to higher authority"""
    escalation_approvers = get_escalation_chain(original_data.get('department'))
    
    if escalation_level >= len(escalation_approvers):
        # No more escalation levels, auto-reject
        update_approval_status(approval_id, "rejected", "Maximum escalation reached")
        return {"status": "auto_rejected", "reason": "max_escalation"}
    
    next_approver = escalation_approvers[escalation_level]
    
    escalated_data = {
        **original_data,
        'title': f"[ESCALATED] {original_data['title']}",
        'escalation_level': escalation_level,
        'original_approver': original_data.get('approver_email'),
        'escalation_reason': 'Previous approval timeout'
    }
    
    # Update approval record with new approver
    update_approval_approver(approval_id, next_approver['email'])
    
    # Send notification to escalated approver
    send_approval_notification.send(
        next_approver['email'],
        escalated_data,
        approval_id
    )
    
    return {
        "escalated_to": next_approver['email'],
        "escalation_level": escalation_level,
        "approval_id": approval_id
    }

@hy.task
def process_expense_report(expense_data: dict, approver_email: str):
    """Process expense report with human approval"""
    
    # Step 1: Validate expense data
    validation_result = validate_expense_data(expense_data)
    if not validation_result['is_valid']:
        raise Exception(f"Invalid expense data: {validation_result['errors']}")
    
    # Step 2: Check if approval is needed
    approval_threshold = get_approval_threshold(expense_data.get('department', 'default'))
    
    if expense_data["amount"] > approval_threshold:
        approval_id = create_approval_record({
            **expense_data,
            'title': f"Expense Report - ${expense_data['amount']:.2f}",
            'description': expense_data.get('description', 'Expense reimbursement request'),
            'requester': expense_data.get('submitter_email'),
            'approver_email': approver_email,
            'timeout_hours': 48
        })
        
        # Send notification to approver
        send_approval_notification.send(
            approver_email, 
            expense_data, 
            approval_id
        )
        
        # Wait for human decision
        approval_result = wait_for_human_approval.send(approval_id, 48).get()
        
        if approval_result == "rejected":
            update_expense_status(expense_data["id"], "rejected")
            send_rejection_notification.send(
                expense_data["submitter_email"],
                expense_data,
                get_approval_comment(approval_id)
            )
            return {"status": "rejected", "reason": "Manager approval denied"}
            
        elif approval_result == "timeout":
            escalation_result = escalate_approval.send(
                approval_id, 
                expense_data, 
                1
            ).get()
            
            # Wait for escalated approval
            escalated_result = wait_for_human_approval.send(approval_id, 24).get()
            
            if escalated_result != "approved":
                update_expense_status(expense_data["id"], "rejected")
                return {"status": "rejected", "reason": f"Escalation {escalated_result}"}
    
    # Step 3: Process approved expense
    payment_result = process_payment.send(expense_data).get()
    update_expense_status(expense_data["id"], "processed")
    
    send_confirmation.send(
        expense_data["submitter_email"],
        expense_data,
        payment_result
    )
    
    return {
        "status": "processed", 
        "amount": expense_data["amount"],
        "payment_id": payment_result.get("payment_id")
    }

@hy.task
def deploy_with_approval(service_name: str, version: str, environment: str, approver_email: str):
    """Deploy service after human approval"""
    
    deployment_data = {
        "title": f"Deploy {service_name} v{version} to {environment}",
        "description": f"Deploy {service_name} version {version} to {environment} environment",
        "service": service_name,
        "version": version,
        "environment": environment,
        "requester": "CI/CD Pipeline",
        "timeout_hours": 2  # Shorter timeout for deployments
    }
    
    # Create approval request
    approval_id = create_approval_record({
        **deployment_data,
        'approver_email': approver_email
    })
    
    # Notify approver
    send_approval_notification.send(
        approver_email,
        deployment_data,
        approval_id
    )
    
    # Wait for approval
    approval_result = wait_for_human_approval.send(approval_id, 2).get()
    
    if approval_result == "approved":
        # Proceed with deployment
        deployment_result = deploy_service.send(
            service_name, 
            version, 
            environment
        ).get()
        
        # Send success notification
        send_deployment_notification.send(
            approver_email,
            "success",
            deployment_data,
            deployment_result
        )
        
        return {
            "status": "deployed", 
            "service": service_name, 
            "version": version,
            "environment": environment,
            "deployment_id": deployment_result.get("deployment_id")
        }
    else:
        # Send cancellation notification
        send_deployment_notification.send(
            approver_email,
            "cancelled",
            deployment_data,
            {"reason": approval_result}
        )
        
        return {
            "status": "deployment_cancelled", 
            "reason": approval_result,
            "service": service_name,
            "version": version
        }

@hy.task
def batch_approval_workflow(items: list, approver_email: str, batch_threshold: int = 10):
    """Process multiple items requiring approval in batches"""
    if len(items) < batch_threshold:
        # Process individually
        results = []
        for item in items:
            if item['type'] == 'expense':
                result = process_expense_report.send(item['data'], approver_email)
            elif item['type'] == 'deployment':
                result = deploy_with_approval.send(
                    item['data']['service'],
                    item['data']['version'],
                    item['data']['environment'],
                    approver_email
                )
            results.append(result.task_id)
        return {"individual_processing": True, "task_ids": results}
    
    # Create batch approval
    batch_data = {
        "title": f"Batch Approval - {len(items)} items",
        "description": f"Approve {len(items)} items: " + ", ".join([item.get('title', f"{item['type']}") for item in items[:3]]) + ("..." if len(items) > 3 else ""),
        "items": items,
        "requester": "batch_processor",
        "timeout_hours": 24
    }
    
    approval_id = create_batch_approval_record(batch_data, approver_email)
    
    # Send batch notification
    send_approval_notification.send(approver_email, batch_data, approval_id)
    
    # Wait for batch decision
    approval_result = wait_for_human_approval.send(approval_id, 24).get()
    
    if approval_result == "approved":
        # Process all items
        results = []
        for item in items:
            # Process each item without individual approval
            result = process_pre_approved_item.send(item)
            results.append(result.task_id)
        
        return {"batch_approved": True, "processed_items": len(items), "task_ids": results}
    else:
        return {"batch_rejected": True, "reason": approval_result, "items_count": len(items)}

REST API Endpoints

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from .tasks import process_expense_report, deploy_with_approval, batch_approval_workflow

app = FastAPI()

class ExpenseRequest(BaseModel):
    expense_data: dict
    approver_email: str
    
class DeploymentRequest(BaseModel):
    service_name: str
    version: str
    environment: str = "production"
    approver_email: str

class ApprovalDecision(BaseModel):
    approval_id: str
    decision: str  # "approve" or "reject"
    comment: Optional[str] = None

class BatchApprovalRequest(BaseModel):
    items: List[dict]
    approver_email: str
    batch_threshold: int = 10

@app.post("/expenses/submit")
async def submit_expense(request: ExpenseRequest):
    """Submit expense for processing with human approval"""
    task = process_expense_report.send(
        request.expense_data,
        request.approver_email
    )
    
    return {
        "message": "Expense submitted for approval",
        "task_id": task.task_id,
        "expense_id": request.expense_data.get("id"),
        "amount": request.expense_data.get("amount"),
        "approver": request.approver_email
    }

@app.post("/deployments/request")
async def request_deployment(request: DeploymentRequest):
    """Request deployment with approval"""
    task = deploy_with_approval.send(
        request.service_name,
        request.version,
        request.environment,
        request.approver_email
    )
    
    return {
        "message": "Deployment requested, awaiting approval",
        "task_id": task.task_id,
        "service": request.service_name,
        "version": request.version,
        "environment": request.environment,
        "approver": request.approver_email
    }

@app.post("/approvals/decide")
async def make_approval_decision(decision: ApprovalDecision):
    """Record human approval/rejection decision"""
    try:
        update_approval_status(
            decision.approval_id,
            decision.decision,
            decision.comment
        )
        
        return {
            "message": f"Approval {decision.decision} recorded",
            "approval_id": decision.approval_id,
            "decision": decision.decision
        }
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/approvals/pending")
async def get_pending_approvals(approver_email: str):
    """Get all pending approvals for an approver"""
    try:
        pending_approvals = get_pending_approvals_for_user(approver_email)
        
        return {
            "pending_approvals": pending_approvals,
            "count": len(pending_approvals),
            "approver_email": approver_email
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/approvals/{approval_id}")
async def get_approval_details(approval_id: str):
    """Get detailed information about a specific approval"""
    approval = get_approval_by_id(approval_id)
    if not approval:
        raise HTTPException(status_code=404, detail="Approval not found")
    
    return approval

@app.post("/approvals/batch")
async def submit_batch_approval(request: BatchApprovalRequest):
    """Submit batch of items for approval"""
    task = batch_approval_workflow.send(
        request.items,
        request.approver_email,
        request.batch_threshold
    )
    
    return {
        "message": "Batch approval workflow started",
        "task_id": task.task_id,
        "items_count": len(request.items),
        "batch_threshold": request.batch_threshold
    }

@app.get("/tasks/{task_id}/status")
async def get_task_status(task_id: str):
    """Get status of approval workflow task"""
    task = hy.get_task(task_id)
    
    return {
        "task_id": task_id,
        "status": task.status,
        "result": task.result if task.is_complete else None,
        "error": task.error if task.has_failed else None,
        "created_at": task.created_at,
        "completed_at": task.completed_at,
        "progress": getattr(task, 'progress', None)
    }

Usage Examples

Submit Expense for Approval

# Submit expense report
curl -X POST http://localhost:8000/expenses/submit \
  -H "Content-Type: application/json" \
  -d '{
    "expense_data": {
      "id": "exp_123",
      "amount": 750.00,
      "description": "Client dinner and taxi",
      "submitter_email": "john@company.com",
      "department": "sales",
      "receipt_urls": ["https://receipts.com/123.jpg"]
    },
    "approver_email": "manager@company.com"
  }'

# Request deployment approval
curl -X POST http://localhost:8000/deployments/request \
  -H "Content-Type: application/json" \
  -d '{
    "service_name": "user-service",
    "version": "v2.1.0",
    "environment": "production",
    "approver_email": "devops@company.com"
  }'

Make Approval Decisions

# Approve a request
curl -X POST http://localhost:8000/approvals/decide \
  -H "Content-Type: application/json" \
  -d '{
    "approval_id": "app_123",
    "decision": "approve",
    "comment": "Looks good, approved for processing"
  }'

# Reject a request
curl -X POST http://localhost:8000/approvals/decide \
  -H "Content-Type: application/json" \
  -d '{
    "approval_id": "app_124",
    "decision": "reject",
    "comment": "Missing receipt for taxi fare"
  }'

Check Status and Pending Approvals

# Get pending approvals for a user
curl "http://localhost:8000/approvals/pending?approver_email=manager@company.com"

# Check task status
curl http://localhost:8000/tasks/task_12345/status

# Get approval details
curl http://localhost:8000/approvals/app_123

Advanced Approval Patterns

Multi-Level Approval

@hy.task
def multi_level_approval(request_data: dict, approval_chain: list):
    """Process request through multiple approval levels"""
    current_level = 0
    
    for approver_config in approval_chain:
        level_data = {
            **request_data,
            'current_level': current_level,
            'total_levels': len(approval_chain),
            'level_name': approver_config['level_name']
        }
        
        approval_id = create_approval_record(level_data, approver_config['approver_email'])
        
        send_approval_notification.send(
            approver_config['approver_email'],
            level_data,
            approval_id
        )
        
        result = wait_for_human_approval.send(
            approval_id, 
            approver_config.get('timeout_hours', 24)
        ).get()
        
        if result != "approved":
            return {
                "status": "rejected_at_level",
                "level": current_level,
                "level_name": approver_config['level_name'],
                "reason": result
            }
        
        current_level += 1
    
    return {
        "status": "fully_approved",
        "levels_completed": len(approval_chain)
    }

Conditional Approval Logic

@hy.task
def smart_approval_routing(request_data: dict):
    """Route approval based on request attributes"""
    approval_rules = get_approval_rules(request_data['type'])
    
    for rule in approval_rules:
        if evaluate_condition(rule['condition'], request_data):
            if rule['action'] == 'auto_approve':
                return {"status": "auto_approved", "rule": rule['name']}
            elif rule['action'] == 'require_approval':
                return process_approval_requirement(request_data, rule['approvers'])
            elif rule['action'] == 'escalate':
                return escalate_to_committee(request_data, rule['committee'])
    
    # Default fallback
    return {"status": "no_matching_rule", "requires_manual_review": True}

Frontend Integration

Approval Dashboard Component

// React component for approval dashboard
function ApprovalDashboard({ userEmail }) {
  const [pendingApprovals, setPendingApprovals] = useState([]);
  
  useEffect(() => {
    fetchPendingApprovals(userEmail).then(setPendingApprovals);
  }, [userEmail]);
  
  const handleApproval = async (approvalId, decision, comment) => {
    await fetch('/approvals/decide', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ approvalId, decision, comment })
    });
    
    // Refresh pending approvals
    const updated = await fetchPendingApprovals(userEmail);
    setPendingApprovals(updated);
  };
  
  return (
    <div className="approval-dashboard">
      <h2>Pending Approvals ({pendingApprovals.length})</h2>
      {pendingApprovals.map(approval => (
        <ApprovalCard 
          key={approval.id}
          approval={approval}
          onApprove={(comment) => handleApproval(approval.id, 'approve', comment)}
          onReject={(comment) => handleApproval(approval.id, 'reject', comment)}
        />
      ))}
    </div>
  );
}

Production Considerations

  • Timeout strategies: Implement appropriate timeouts with escalation
  • Notification reliability: Use multiple channels (email, Slack, SMS) for critical approvals
  • Audit requirements: Maintain complete audit trails for compliance
  • Mobile access: Ensure approvers can respond from mobile devices
  • Backup approvers: Handle approver unavailability with delegation
  • Bulk operations: Optimize for high-volume approval scenarios

Next Steps

Error Handling

Handle approval workflow failures

AI Agent Actions

Combine AI automation with human oversight