Create workflows that pause for human approval, enabling oversight for critical operations like expense processing, deployments, and sensitive data operations.

Overview

This example demonstrates human-in-the-loop patterns:
  • Approval workflows with email notifications and web interfaces
  • Timeout handling with escalation mechanisms
  • Multi-step processes that pause for human decisions
  • Expense processing with manager approval requirements
  • Deployment gates requiring human sign-off
  • Audit trails for all approval decisions
Perfect for compliance-driven processes, financial operations, or any workflow requiring human oversight.

Task Definitions

from hyrex import HyrexRegistry
import time
from enum import Enum
from datetime import datetime, timedelta

hy = HyrexRegistry()

class ApprovalStatus(Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"

@hy.task
def send_approval_notification(approver_email: str, task_data: dict, approval_id: str):
    """Send email notification to human approver"""
    email_content = f"""
    New task requires your approval:
    
    Task: {task_data['title']}
    Description: {task_data['description']}
    Requested by: {task_data['requester']}
    Amount: {task_data.get('amount', 'N/A')}
    
    Approve: https://yourapp.com/approvals/{approval_id}/approve
    Reject: https://yourapp.com/approvals/{approval_id}/reject
    
    This approval will timeout in {task_data.get('timeout_hours', 24)} hours.
    """
    
    send_email(
        to=approver_email,
        subject="Task Approval Required",
        body=email_content,
        priority="high"
    )
    
    return {"notification_sent": True, "approver": approver_email}

@hy.task
def wait_for_human_approval(approval_id: str, timeout_hours: int = 24) -> str:
    """Wait for human approval with timeout"""
    start_time = time.time()
    timeout_seconds = timeout_hours * 3600
    check_interval = 30  # seconds
    
    while time.time() - start_time < timeout_seconds:
        # Check approval status in database
        approval = get_approval_status(approval_id)
        
        if approval['status'] == ApprovalStatus.APPROVED.value:
            record_approval_completion(approval_id, "approved", approval.get('comment'))
            return "approved"
        elif approval['status'] == ApprovalStatus.REJECTED.value:
            record_approval_completion(approval_id, "rejected", approval.get('comment'))
            return "rejected"
        
        # Wait before checking again
        time.sleep(check_interval)
    
    # Timeout reached
    record_approval_completion(approval_id, "timeout", "Approval timeout exceeded")
    return "timeout"

@hy.task
def escalate_approval(approval_id: str, original_data: dict, escalation_level: int = 1):
    """Escalate approval to higher authority"""
    escalation_approvers = get_escalation_chain(original_data.get('department'))
    
    if escalation_level >= len(escalation_approvers):
        # No more escalation levels, auto-reject
        update_approval_status(approval_id, "rejected", "Maximum escalation reached")
        return {"status": "auto_rejected", "reason": "max_escalation"}
    
    next_approver = escalation_approvers[escalation_level]
    
    escalated_data = {
        **original_data,
        'title': f"[ESCALATED] {original_data['title']}",
        'escalation_level': escalation_level,
        'original_approver': original_data.get('approver_email'),
        'escalation_reason': 'Previous approval timeout'
    }
    
    # Update approval record with new approver
    update_approval_approver(approval_id, next_approver['email'])
    
    # Send notification to escalated approver
    send_approval_notification.send(
        next_approver['email'],
        escalated_data,
        approval_id
    )
    
    return {
        "escalated_to": next_approver['email'],
        "escalation_level": escalation_level,
        "approval_id": approval_id
    }

@hy.task
def process_expense_report(expense_data: dict, approver_email: str):
    """Process expense report with human approval"""
    
    # Step 1: Validate expense data
    validation_result = validate_expense_data(expense_data)
    if not validation_result['is_valid']:
        raise Exception(f"Invalid expense data: {validation_result['errors']}")
    
    # Step 2: Check if approval is needed
    approval_threshold = get_approval_threshold(expense_data.get('department', 'default'))
    
    if expense_data["amount"] > approval_threshold:
        approval_id = create_approval_record({
            **expense_data,
            'title': f"Expense Report - ${expense_data['amount']:.2f}",
            'description': expense_data.get('description', 'Expense reimbursement request'),
            'requester': expense_data.get('submitter_email'),
            'approver_email': approver_email,
            'timeout_hours': 48
        })
        
        # Send notification to approver
        send_approval_notification.send(
            approver_email, 
            expense_data, 
            approval_id
        )
        
        # Wait for human decision
        approval_result = wait_for_human_approval.send(approval_id, 48).get()
        
        if approval_result == "rejected":
            update_expense_status(expense_data["id"], "rejected")
            send_rejection_notification.send(
                expense_data["submitter_email"],
                expense_data,
                get_approval_comment(approval_id)
            )
            return {"status": "rejected", "reason": "Manager approval denied"}
            
        elif approval_result == "timeout":
            escalation_result = escalate_approval.send(
                approval_id, 
                expense_data, 
                1
            ).get()
            
            # Wait for escalated approval
            escalated_result = wait_for_human_approval.send(approval_id, 24).get()
            
            if escalated_result != "approved":
                update_expense_status(expense_data["id"], "rejected")
                return {"status": "rejected", "reason": f"Escalation {escalated_result}"}
    
    # Step 3: Process approved expense
    payment_result = process_payment.send(expense_data).get()
    update_expense_status(expense_data["id"], "processed")
    
    send_confirmation.send(
        expense_data["submitter_email"],
        expense_data,
        payment_result
    )
    
    return {
        "status": "processed", 
        "amount": expense_data["amount"],
        "payment_id": payment_result.get("payment_id")
    }

@hy.task
def deploy_with_approval(service_name: str, version: str, environment: str, approver_email: str):
    """Deploy service after human approval"""
    
    deployment_data = {
        "title": f"Deploy {service_name} v{version} to {environment}",
        "description": f"Deploy {service_name} version {version} to {environment} environment",
        "service": service_name,
        "version": version,
        "environment": environment,
        "requester": "CI/CD Pipeline",
        "timeout_hours": 2  # Shorter timeout for deployments
    }
    
    # Create approval request
    approval_id = create_approval_record({
        **deployment_data,
        'approver_email': approver_email
    })
    
    # Notify approver
    send_approval_notification.send(
        approver_email,
        deployment_data,
        approval_id
    )
    
    # Wait for approval
    approval_result = wait_for_human_approval.send(approval_id, 2).get()
    
    if approval_result == "approved":
        # Proceed with deployment
        deployment_result = deploy_service.send(
            service_name, 
            version, 
            environment
        ).get()
        
        # Send success notification
        send_deployment_notification.send(
            approver_email,
            "success",
            deployment_data,
            deployment_result
        )
        
        return {
            "status": "deployed", 
            "service": service_name, 
            "version": version,
            "environment": environment,
            "deployment_id": deployment_result.get("deployment_id")
        }
    else:
        # Send cancellation notification
        send_deployment_notification.send(
            approver_email,
            "cancelled",
            deployment_data,
            {"reason": approval_result}
        )
        
        return {
            "status": "deployment_cancelled", 
            "reason": approval_result,
            "service": service_name,
            "version": version
        }

@hy.task
def batch_approval_workflow(items: list, approver_email: str, batch_threshold: int = 10):
    """Process multiple items requiring approval in batches"""
    if len(items) < batch_threshold:
        # Process individually
        results = []
        for item in items:
            if item['type'] == 'expense':
                result = process_expense_report.send(item['data'], approver_email)
            elif item['type'] == 'deployment':
                result = deploy_with_approval.send(
                    item['data']['service'],
                    item['data']['version'],
                    item['data']['environment'],
                    approver_email
                )
            results.append(result.task_id)
        return {"individual_processing": True, "task_ids": results}
    
    # Create batch approval
    batch_data = {
        "title": f"Batch Approval - {len(items)} items",
        "description": f"Approve {len(items)} items: " + ", ".join([item.get('title', f"{item['type']}") for item in items[:3]]) + ("..." if len(items) > 3 else ""),
        "items": items,
        "requester": "batch_processor",
        "timeout_hours": 24
    }
    
    approval_id = create_batch_approval_record(batch_data, approver_email)
    
    # Send batch notification
    send_approval_notification.send(approver_email, batch_data, approval_id)
    
    # Wait for batch decision
    approval_result = wait_for_human_approval.send(approval_id, 24).get()
    
    if approval_result == "approved":
        # Process all items
        results = []
        for item in items:
            # Process each item without individual approval
            result = process_pre_approved_item.send(item)
            results.append(result.task_id)
        
        return {"batch_approved": True, "processed_items": len(items), "task_ids": results}
    else:
        return {"batch_rejected": True, "reason": approval_result, "items_count": len(items)}

REST API Endpoints

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from .tasks import process_expense_report, deploy_with_approval, batch_approval_workflow

app = FastAPI()

class ExpenseRequest(BaseModel):
    expense_data: dict
    approver_email: str
    
class DeploymentRequest(BaseModel):
    service_name: str
    version: str
    environment: str = "production"
    approver_email: str

class ApprovalDecision(BaseModel):
    approval_id: str
    decision: str  # "approve" or "reject"
    comment: Optional[str] = None

class BatchApprovalRequest(BaseModel):
    items: List[dict]
    approver_email: str
    batch_threshold: int = 10

@app.post("/expenses/submit")
async def submit_expense(request: ExpenseRequest):
    """Submit expense for processing with human approval"""
    task = process_expense_report.send(
        request.expense_data,
        request.approver_email
    )
    
    return {
        "message": "Expense submitted for approval",
        "task_id": task.task_id,
        "expense_id": request.expense_data.get("id"),
        "amount": request.expense_data.get("amount"),
        "approver": request.approver_email
    }

@app.post("/deployments/request")
async def request_deployment(request: DeploymentRequest):
    """Request deployment with approval"""
    task = deploy_with_approval.send(
        request.service_name,
        request.version,
        request.environment,
        request.approver_email
    )
    
    return {
        "message": "Deployment requested, awaiting approval",
        "task_id": task.task_id,
        "service": request.service_name,
        "version": request.version,
        "environment": request.environment,
        "approver": request.approver_email
    }

@app.post("/approvals/decide")
async def make_approval_decision(decision: ApprovalDecision):
    """Record human approval/rejection decision"""
    try:
        update_approval_status(
            decision.approval_id,
            decision.decision,
            decision.comment
        )
        
        return {
            "message": f"Approval {decision.decision} recorded",
            "approval_id": decision.approval_id,
            "decision": decision.decision
        }
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/approvals/pending")
async def get_pending_approvals(approver_email: str):
    """Get all pending approvals for an approver"""
    try:
        pending_approvals = get_pending_approvals_for_user(approver_email)
        
        return {
            "pending_approvals": pending_approvals,
            "count": len(pending_approvals),
            "approver_email": approver_email
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/approvals/{approval_id}")
async def get_approval_details(approval_id: str):
    """Get detailed information about a specific approval"""
    approval = get_approval_by_id(approval_id)
    if not approval:
        raise HTTPException(status_code=404, detail="Approval not found")
    
    return approval

@app.post("/approvals/batch")
async def submit_batch_approval(request: BatchApprovalRequest):
    """Submit batch of items for approval"""
    task = batch_approval_workflow.send(
        request.items,
        request.approver_email,
        request.batch_threshold
    )
    
    return {
        "message": "Batch approval workflow started",
        "task_id": task.task_id,
        "items_count": len(request.items),
        "batch_threshold": request.batch_threshold
    }

@app.get("/tasks/{task_id}/status")
async def get_task_status(task_id: str):
    """Get status of approval workflow task"""
    task = hy.get_task(task_id)
    
    return {
        "task_id": task_id,
        "status": task.status,
        "result": task.result if task.is_complete else None,
        "error": task.error if task.has_failed else None,
        "created_at": task.created_at,
        "completed_at": task.completed_at,
        "progress": getattr(task, 'progress', None)
    }

Usage Examples

Submit Expense for Approval

# Submit expense report
curl -X POST http://localhost:8000/expenses/submit \
  -H "Content-Type: application/json" \
  -d '{
    "expense_data": {
      "id": "exp_123",
      "amount": 750.00,
      "description": "Client dinner and taxi",
      "submitter_email": "john@company.com",
      "department": "sales",
      "receipt_urls": ["https://receipts.com/123.jpg"]
    },
    "approver_email": "manager@company.com"
  }'

# Request deployment approval
curl -X POST http://localhost:8000/deployments/request \
  -H "Content-Type: application/json" \
  -d '{
    "service_name": "user-service",
    "version": "v2.1.0",
    "environment": "production",
    "approver_email": "devops@company.com"
  }'

Make Approval Decisions

# Approve a request
curl -X POST http://localhost:8000/approvals/decide \
  -H "Content-Type: application/json" \
  -d '{
    "approval_id": "app_123",
    "decision": "approve",
    "comment": "Looks good, approved for processing"
  }'

# Reject a request
curl -X POST http://localhost:8000/approvals/decide \
  -H "Content-Type: application/json" \
  -d '{
    "approval_id": "app_124",
    "decision": "reject",
    "comment": "Missing receipt for taxi fare"
  }'

Check Status and Pending Approvals

# Get pending approvals for a user
curl "http://localhost:8000/approvals/pending?approver_email=manager@company.com"

# Check task status
curl http://localhost:8000/tasks/task_12345/status

# Get approval details
curl http://localhost:8000/approvals/app_123

Advanced Approval Patterns

Multi-Level Approval

@hy.task
def multi_level_approval(request_data: dict, approval_chain: list):
    """Process request through multiple approval levels"""
    current_level = 0
    
    for approver_config in approval_chain:
        level_data = {
            **request_data,
            'current_level': current_level,
            'total_levels': len(approval_chain),
            'level_name': approver_config['level_name']
        }
        
        approval_id = create_approval_record(level_data, approver_config['approver_email'])
        
        send_approval_notification.send(
            approver_config['approver_email'],
            level_data,
            approval_id
        )
        
        result = wait_for_human_approval.send(
            approval_id, 
            approver_config.get('timeout_hours', 24)
        ).get()
        
        if result != "approved":
            return {
                "status": "rejected_at_level",
                "level": current_level,
                "level_name": approver_config['level_name'],
                "reason": result
            }
        
        current_level += 1
    
    return {
        "status": "fully_approved",
        "levels_completed": len(approval_chain)
    }

Conditional Approval Logic

@hy.task
def smart_approval_routing(request_data: dict):
    """Route approval based on request attributes"""
    approval_rules = get_approval_rules(request_data['type'])
    
    for rule in approval_rules:
        if evaluate_condition(rule['condition'], request_data):
            if rule['action'] == 'auto_approve':
                return {"status": "auto_approved", "rule": rule['name']}
            elif rule['action'] == 'require_approval':
                return process_approval_requirement(request_data, rule['approvers'])
            elif rule['action'] == 'escalate':
                return escalate_to_committee(request_data, rule['committee'])
    
    # Default fallback
    return {"status": "no_matching_rule", "requires_manual_review": True}

Frontend Integration

Approval Dashboard Component

// React component for approval dashboard
function ApprovalDashboard({ userEmail }) {
  const [pendingApprovals, setPendingApprovals] = useState([]);
  
  useEffect(() => {
    fetchPendingApprovals(userEmail).then(setPendingApprovals);
  }, [userEmail]);
  
  const handleApproval = async (approvalId, decision, comment) => {
    await fetch('/approvals/decide', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ approvalId, decision, comment })
    });
    
    // Refresh pending approvals
    const updated = await fetchPendingApprovals(userEmail);
    setPendingApprovals(updated);
  };
  
  return (
    <div className="approval-dashboard">
      <h2>Pending Approvals ({pendingApprovals.length})</h2>
      {pendingApprovals.map(approval => (
        <ApprovalCard 
          key={approval.id}
          approval={approval}
          onApprove={(comment) => handleApproval(approval.id, 'approve', comment)}
          onReject={(comment) => handleApproval(approval.id, 'reject', comment)}
        />
      ))}
    </div>
  );
}

Production Considerations

  • Timeout strategies: Implement appropriate timeouts with escalation
  • Notification reliability: Use multiple channels (email, Slack, SMS) for critical approvals
  • Audit requirements: Maintain complete audit trails for compliance
  • Mobile access: Ensure approvers can respond from mobile devices
  • Backup approvers: Handle approver unavailability with delegation
  • Bulk operations: Optimize for high-volume approval scenarios

Next Steps