Overview
This example demonstrates human-in-the-loop patterns:- Approval workflows with email notifications and web interfaces
- Timeout handling with escalation mechanisms
- Multi-step processes that pause for human decisions
- Expense processing with manager approval requirements
- Deployment gates requiring human sign-off
- Audit trails for all approval decisions
Task Definitions
Copy
Ask AI
from hyrex import HyrexRegistry
import time
from enum import Enum
from datetime import datetime, timedelta
hy = HyrexRegistry()
class ApprovalStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
@hy.task
def send_approval_notification(approver_email: str, task_data: dict, approval_id: str):
"""Send email notification to human approver"""
email_content = f"""
New task requires your approval:
Task: {task_data['title']}
Description: {task_data['description']}
Requested by: {task_data['requester']}
Amount: {task_data.get('amount', 'N/A')}
Approve: https://yourapp.com/approvals/{approval_id}/approve
Reject: https://yourapp.com/approvals/{approval_id}/reject
This approval will timeout in {task_data.get('timeout_hours', 24)} hours.
"""
send_email(
to=approver_email,
subject="Task Approval Required",
body=email_content,
priority="high"
)
return {"notification_sent": True, "approver": approver_email}
@hy.task
def wait_for_human_approval(approval_id: str, timeout_hours: int = 24) -> str:
"""Wait for human approval with timeout"""
start_time = time.time()
timeout_seconds = timeout_hours * 3600
check_interval = 30 # seconds
while time.time() - start_time < timeout_seconds:
# Check approval status in database
approval = get_approval_status(approval_id)
if approval['status'] == ApprovalStatus.APPROVED.value:
record_approval_completion(approval_id, "approved", approval.get('comment'))
return "approved"
elif approval['status'] == ApprovalStatus.REJECTED.value:
record_approval_completion(approval_id, "rejected", approval.get('comment'))
return "rejected"
# Wait before checking again
time.sleep(check_interval)
# Timeout reached
record_approval_completion(approval_id, "timeout", "Approval timeout exceeded")
return "timeout"
@hy.task
def escalate_approval(approval_id: str, original_data: dict, escalation_level: int = 1):
"""Escalate approval to higher authority"""
escalation_approvers = get_escalation_chain(original_data.get('department'))
if escalation_level >= len(escalation_approvers):
# No more escalation levels, auto-reject
update_approval_status(approval_id, "rejected", "Maximum escalation reached")
return {"status": "auto_rejected", "reason": "max_escalation"}
next_approver = escalation_approvers[escalation_level]
escalated_data = {
**original_data,
'title': f"[ESCALATED] {original_data['title']}",
'escalation_level': escalation_level,
'original_approver': original_data.get('approver_email'),
'escalation_reason': 'Previous approval timeout'
}
# Update approval record with new approver
update_approval_approver(approval_id, next_approver['email'])
# Send notification to escalated approver
send_approval_notification.send(
next_approver['email'],
escalated_data,
approval_id
)
return {
"escalated_to": next_approver['email'],
"escalation_level": escalation_level,
"approval_id": approval_id
}
@hy.task
def process_expense_report(expense_data: dict, approver_email: str):
"""Process expense report with human approval"""
# Step 1: Validate expense data
validation_result = validate_expense_data(expense_data)
if not validation_result['is_valid']:
raise Exception(f"Invalid expense data: {validation_result['errors']}")
# Step 2: Check if approval is needed
approval_threshold = get_approval_threshold(expense_data.get('department', 'default'))
if expense_data["amount"] > approval_threshold:
approval_id = create_approval_record({
**expense_data,
'title': f"Expense Report - ${expense_data['amount']:.2f}",
'description': expense_data.get('description', 'Expense reimbursement request'),
'requester': expense_data.get('submitter_email'),
'approver_email': approver_email,
'timeout_hours': 48
})
# Send notification to approver
send_approval_notification.send(
approver_email,
expense_data,
approval_id
)
# Wait for human decision
approval_result = wait_for_human_approval.send(approval_id, 48).get()
if approval_result == "rejected":
update_expense_status(expense_data["id"], "rejected")
send_rejection_notification.send(
expense_data["submitter_email"],
expense_data,
get_approval_comment(approval_id)
)
return {"status": "rejected", "reason": "Manager approval denied"}
elif approval_result == "timeout":
escalation_result = escalate_approval.send(
approval_id,
expense_data,
1
).get()
# Wait for escalated approval
escalated_result = wait_for_human_approval.send(approval_id, 24).get()
if escalated_result != "approved":
update_expense_status(expense_data["id"], "rejected")
return {"status": "rejected", "reason": f"Escalation {escalated_result}"}
# Step 3: Process approved expense
payment_result = process_payment.send(expense_data).get()
update_expense_status(expense_data["id"], "processed")
send_confirmation.send(
expense_data["submitter_email"],
expense_data,
payment_result
)
return {
"status": "processed",
"amount": expense_data["amount"],
"payment_id": payment_result.get("payment_id")
}
@hy.task
def deploy_with_approval(service_name: str, version: str, environment: str, approver_email: str):
"""Deploy service after human approval"""
deployment_data = {
"title": f"Deploy {service_name} v{version} to {environment}",
"description": f"Deploy {service_name} version {version} to {environment} environment",
"service": service_name,
"version": version,
"environment": environment,
"requester": "CI/CD Pipeline",
"timeout_hours": 2 # Shorter timeout for deployments
}
# Create approval request
approval_id = create_approval_record({
**deployment_data,
'approver_email': approver_email
})
# Notify approver
send_approval_notification.send(
approver_email,
deployment_data,
approval_id
)
# Wait for approval
approval_result = wait_for_human_approval.send(approval_id, 2).get()
if approval_result == "approved":
# Proceed with deployment
deployment_result = deploy_service.send(
service_name,
version,
environment
).get()
# Send success notification
send_deployment_notification.send(
approver_email,
"success",
deployment_data,
deployment_result
)
return {
"status": "deployed",
"service": service_name,
"version": version,
"environment": environment,
"deployment_id": deployment_result.get("deployment_id")
}
else:
# Send cancellation notification
send_deployment_notification.send(
approver_email,
"cancelled",
deployment_data,
{"reason": approval_result}
)
return {
"status": "deployment_cancelled",
"reason": approval_result,
"service": service_name,
"version": version
}
@hy.task
def batch_approval_workflow(items: list, approver_email: str, batch_threshold: int = 10):
"""Process multiple items requiring approval in batches"""
if len(items) < batch_threshold:
# Process individually
results = []
for item in items:
if item['type'] == 'expense':
result = process_expense_report.send(item['data'], approver_email)
elif item['type'] == 'deployment':
result = deploy_with_approval.send(
item['data']['service'],
item['data']['version'],
item['data']['environment'],
approver_email
)
results.append(result.task_id)
return {"individual_processing": True, "task_ids": results}
# Create batch approval
batch_data = {
"title": f"Batch Approval - {len(items)} items",
"description": f"Approve {len(items)} items: " + ", ".join([item.get('title', f"{item['type']}") for item in items[:3]]) + ("..." if len(items) > 3 else ""),
"items": items,
"requester": "batch_processor",
"timeout_hours": 24
}
approval_id = create_batch_approval_record(batch_data, approver_email)
# Send batch notification
send_approval_notification.send(approver_email, batch_data, approval_id)
# Wait for batch decision
approval_result = wait_for_human_approval.send(approval_id, 24).get()
if approval_result == "approved":
# Process all items
results = []
for item in items:
# Process each item without individual approval
result = process_pre_approved_item.send(item)
results.append(result.task_id)
return {"batch_approved": True, "processed_items": len(items), "task_ids": results}
else:
return {"batch_rejected": True, "reason": approval_result, "items_count": len(items)}
REST API Endpoints
Copy
Ask AI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from .tasks import process_expense_report, deploy_with_approval, batch_approval_workflow
app = FastAPI()
class ExpenseRequest(BaseModel):
expense_data: dict
approver_email: str
class DeploymentRequest(BaseModel):
service_name: str
version: str
environment: str = "production"
approver_email: str
class ApprovalDecision(BaseModel):
approval_id: str
decision: str # "approve" or "reject"
comment: Optional[str] = None
class BatchApprovalRequest(BaseModel):
items: List[dict]
approver_email: str
batch_threshold: int = 10
@app.post("/expenses/submit")
async def submit_expense(request: ExpenseRequest):
"""Submit expense for processing with human approval"""
task = process_expense_report.send(
request.expense_data,
request.approver_email
)
return {
"message": "Expense submitted for approval",
"task_id": task.task_id,
"expense_id": request.expense_data.get("id"),
"amount": request.expense_data.get("amount"),
"approver": request.approver_email
}
@app.post("/deployments/request")
async def request_deployment(request: DeploymentRequest):
"""Request deployment with approval"""
task = deploy_with_approval.send(
request.service_name,
request.version,
request.environment,
request.approver_email
)
return {
"message": "Deployment requested, awaiting approval",
"task_id": task.task_id,
"service": request.service_name,
"version": request.version,
"environment": request.environment,
"approver": request.approver_email
}
@app.post("/approvals/decide")
async def make_approval_decision(decision: ApprovalDecision):
"""Record human approval/rejection decision"""
try:
update_approval_status(
decision.approval_id,
decision.decision,
decision.comment
)
return {
"message": f"Approval {decision.decision} recorded",
"approval_id": decision.approval_id,
"decision": decision.decision
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/approvals/pending")
async def get_pending_approvals(approver_email: str):
"""Get all pending approvals for an approver"""
try:
pending_approvals = get_pending_approvals_for_user(approver_email)
return {
"pending_approvals": pending_approvals,
"count": len(pending_approvals),
"approver_email": approver_email
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/approvals/{approval_id}")
async def get_approval_details(approval_id: str):
"""Get detailed information about a specific approval"""
approval = get_approval_by_id(approval_id)
if not approval:
raise HTTPException(status_code=404, detail="Approval not found")
return approval
@app.post("/approvals/batch")
async def submit_batch_approval(request: BatchApprovalRequest):
"""Submit batch of items for approval"""
task = batch_approval_workflow.send(
request.items,
request.approver_email,
request.batch_threshold
)
return {
"message": "Batch approval workflow started",
"task_id": task.task_id,
"items_count": len(request.items),
"batch_threshold": request.batch_threshold
}
@app.get("/tasks/{task_id}/status")
async def get_task_status(task_id: str):
"""Get status of approval workflow task"""
task = hy.get_task(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.is_complete else None,
"error": task.error if task.has_failed else None,
"created_at": task.created_at,
"completed_at": task.completed_at,
"progress": getattr(task, 'progress', None)
}
Usage Examples
Submit Expense for Approval
Copy
Ask AI
# Submit expense report
curl -X POST http://localhost:8000/expenses/submit \
-H "Content-Type: application/json" \
-d '{
"expense_data": {
"id": "exp_123",
"amount": 750.00,
"description": "Client dinner and taxi",
"submitter_email": "john@company.com",
"department": "sales",
"receipt_urls": ["https://receipts.com/123.jpg"]
},
"approver_email": "manager@company.com"
}'
# Request deployment approval
curl -X POST http://localhost:8000/deployments/request \
-H "Content-Type: application/json" \
-d '{
"service_name": "user-service",
"version": "v2.1.0",
"environment": "production",
"approver_email": "devops@company.com"
}'
Make Approval Decisions
Copy
Ask AI
# Approve a request
curl -X POST http://localhost:8000/approvals/decide \
-H "Content-Type: application/json" \
-d '{
"approval_id": "app_123",
"decision": "approve",
"comment": "Looks good, approved for processing"
}'
# Reject a request
curl -X POST http://localhost:8000/approvals/decide \
-H "Content-Type: application/json" \
-d '{
"approval_id": "app_124",
"decision": "reject",
"comment": "Missing receipt for taxi fare"
}'
Check Status and Pending Approvals
Copy
Ask AI
# Get pending approvals for a user
curl "http://localhost:8000/approvals/pending?approver_email=manager@company.com"
# Check task status
curl http://localhost:8000/tasks/task_12345/status
# Get approval details
curl http://localhost:8000/approvals/app_123
Advanced Approval Patterns
Multi-Level Approval
Copy
Ask AI
@hy.task
def multi_level_approval(request_data: dict, approval_chain: list):
"""Process request through multiple approval levels"""
current_level = 0
for approver_config in approval_chain:
level_data = {
**request_data,
'current_level': current_level,
'total_levels': len(approval_chain),
'level_name': approver_config['level_name']
}
approval_id = create_approval_record(level_data, approver_config['approver_email'])
send_approval_notification.send(
approver_config['approver_email'],
level_data,
approval_id
)
result = wait_for_human_approval.send(
approval_id,
approver_config.get('timeout_hours', 24)
).get()
if result != "approved":
return {
"status": "rejected_at_level",
"level": current_level,
"level_name": approver_config['level_name'],
"reason": result
}
current_level += 1
return {
"status": "fully_approved",
"levels_completed": len(approval_chain)
}
Conditional Approval Logic
Copy
Ask AI
@hy.task
def smart_approval_routing(request_data: dict):
"""Route approval based on request attributes"""
approval_rules = get_approval_rules(request_data['type'])
for rule in approval_rules:
if evaluate_condition(rule['condition'], request_data):
if rule['action'] == 'auto_approve':
return {"status": "auto_approved", "rule": rule['name']}
elif rule['action'] == 'require_approval':
return process_approval_requirement(request_data, rule['approvers'])
elif rule['action'] == 'escalate':
return escalate_to_committee(request_data, rule['committee'])
# Default fallback
return {"status": "no_matching_rule", "requires_manual_review": True}
Frontend Integration
Approval Dashboard Component
Copy
Ask AI
// React component for approval dashboard
function ApprovalDashboard({ userEmail }) {
const [pendingApprovals, setPendingApprovals] = useState([]);
useEffect(() => {
fetchPendingApprovals(userEmail).then(setPendingApprovals);
}, [userEmail]);
const handleApproval = async (approvalId, decision, comment) => {
await fetch('/approvals/decide', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ approvalId, decision, comment })
});
// Refresh pending approvals
const updated = await fetchPendingApprovals(userEmail);
setPendingApprovals(updated);
};
return (
<div className="approval-dashboard">
<h2>Pending Approvals ({pendingApprovals.length})</h2>
{pendingApprovals.map(approval => (
<ApprovalCard
key={approval.id}
approval={approval}
onApprove={(comment) => handleApproval(approval.id, 'approve', comment)}
onReject={(comment) => handleApproval(approval.id, 'reject', comment)}
/>
))}
</div>
);
}
Production Considerations
- Timeout strategies: Implement appropriate timeouts with escalation
- Notification reliability: Use multiple channels (email, Slack, SMS) for critical approvals
- Audit requirements: Maintain complete audit trails for compliance
- Mobile access: Ensure approvers can respond from mobile devices
- Backup approvers: Handle approver unavailability with delegation
- Bulk operations: Optimize for high-volume approval scenarios