Human-in-the-Loop with Hyrex
Some decisions are too important for automation alone. Expense approvals, production deployments, content moderation - these critical processes need human judgment. But how do you seamlessly blend automated workflows with human decision-making?
With Hyrex, you can build workflows that pause for human input, send notifications to the right people, wait for decisions with timeouts, and gracefully handle approvals or rejections. Turn complex approval processes into reliable, trackable workflows that never lose track of pending decisions.
Step 1: Define Approval Workflows
Create Hyrex tasks that handle the complete approval lifecycle - validation, notification, waiting for human decisions, and processing the results. These workflows can include timeouts, escalation rules, and multiple approval stages.
1from hyrex import HyrexRegistry
2import time
3from enum import Enum
4
5hy = HyrexRegistry()
6
7class ApprovalStatus(Enum):
8 PENDING = "pending"
9 APPROVED = "approved"
10 REJECTED = "rejected"
11
12@hy.task
13def send_approval_notification(approver_email: str, task_data: dict, approval_id: str):
14 """Send email notification to human approver"""
15 email_content = f"""
16 New task requires your approval:
17
18 Task: {task_data['title']}
19 Description: {task_data['description']}
20 Requested by: {task_data['requester']}
21
22 Approve: https://yourapp.com/approvals/{approval_id}/approve
23 Reject: https://yourapp.com/approvals/{approval_id}/reject
24 """
25
26 # Send email using your preferred service
27 send_email(
28 to=approver_email,
29 subject="Task Approval Required",
30 body=email_content
31 )
32
33@hy.task
34def wait_for_human_approval(approval_id: str, timeout_hours: int = 24) -> str:
35 """Wait for human approval with timeout"""
36 start_time = time.time()
37 timeout_seconds = timeout_hours * 3600
38
39 while time.time() - start_time < timeout_seconds:
40 # Check approval status in database
41 approval = get_approval_status(approval_id)
42
43 if approval.status == ApprovalStatus.APPROVED:
44 return "approved"
45 elif approval.status == ApprovalStatus.REJECTED:
46 return "rejected"
47
48 # Wait before checking again
49 time.sleep(30)
50
51 # Timeout reached
52 return "timeout"
53
54@hy.task
55def process_expense_report(expense_data: dict, approver_email: str):
56 """Process expense report with human approval"""
57
58 # Step 1: Validate expense data
59 validation_result = validate_expense_data(expense_data)
60 if not validation_result.is_valid:
61 raise Exception(f"Invalid expense data: {validation_result.errors}")
62
63 # Step 2: Check if approval is needed (e.g., amount > $500)
64 if expense_data["amount"] > 500:
65 approval_id = create_approval_record(expense_data)
66
67 # Send notification to approver
68 send_approval_notification.send(
69 approver_email,
70 expense_data,
71 approval_id
72 )
73
74 # Wait for human decision
75 approval_result = wait_for_human_approval.send(approval_id, 48).get()
76
77 if approval_result == "rejected":
78 update_expense_status(expense_data["id"], "rejected")
79 send_rejection_notification(expense_data["submitter_email"])
80 return {"status": "rejected", "reason": "Manager approval denied"}
81 elif approval_result == "timeout":
82 escalate_approval(approval_id, expense_data)
83 return {"status": "escalated", "reason": "Approval timeout"}
84
85 # Step 3: Process approved expense
86 process_payment(expense_data)
87 update_expense_status(expense_data["id"], "processed")
88 send_confirmation(expense_data["submitter_email"])
89
90 return {"status": "processed", "amount": expense_data["amount"]}
91
92@hy.task
93def deploy_with_approval(service_name: str, version: str, approver_email: str):
94 """Deploy service after human approval"""
95
96 deployment_data = {
97 "title": f"Deploy {service_name} v{version}",
98 "description": f"Deploy {service_name} version {version} to production",
99 "service": service_name,
100 "version": version,
101 "requester": "CI/CD Pipeline"
102 }
103
104 # Create approval request
105 approval_id = create_approval_record(deployment_data)
106
107 # Notify approver
108 send_approval_notification.send(
109 approver_email,
110 deployment_data,
111 approval_id
112 )
113
114 # Wait for approval
115 approval_result = wait_for_human_approval.send(approval_id, 2).get() # 2 hour timeout
116
117 if approval_result == "approved":
118 # Proceed with deployment
119 deploy_service(service_name, version)
120 return {"status": "deployed", "service": service_name, "version": version}
121 else:
122 return {"status": "deployment_cancelled", "reason": approval_result}Step 2: Build Approval APIs and Interfaces
Create endpoints that trigger approval workflows and allow humans to make decisions. These APIs handle the submission of requests, tracking pending approvals, and recording human decisions that resume the automated workflows.
1from fastapi import FastAPI
2from pydantic import BaseModel
3from .tasks import process_expense_report, deploy_with_approval
4
5app = FastAPI()
6
7class ExpenseRequest(BaseModel):
8 expense_data: dict
9 approver_email: str
10
11class DeploymentRequest(BaseModel):
12 service_name: str
13 version: str
14 approver_email: str
15
16class ApprovalDecision(BaseModel):
17 approval_id: str
18 decision: str # "approve" or "reject"
19 comment: str = None
20
21@app.post("/expenses/submit")
22async def submit_expense(request: ExpenseRequest):
23 # Submit expense for processing with human approval
24 task = process_expense_report.send(
25 request.expense_data,
26 request.approver_email
27 )
28
29 return {
30 "message": "Expense submitted for approval",
31 "task_id": task.task_id,
32 "expense_id": request.expense_data.get("id")
33 }
34
35@app.post("/deployments/request")
36async def request_deployment(request: DeploymentRequest):
37 # Request deployment with approval
38 task = deploy_with_approval.send(
39 request.service_name,
40 request.version,
41 request.approver_email
42 )
43
44 return {
45 "message": "Deployment requested, awaiting approval",
46 "task_id": task.task_id,
47 "service": request.service_name,
48 "version": request.version
49 }
50
51@app.post("/approvals/decide")
52async def make_approval_decision(decision: ApprovalDecision):
53 # Record human approval/rejection decision
54 update_approval_status(
55 decision.approval_id,
56 decision.decision,
57 decision.comment
58 )
59
60 return {
61 "message": f"Approval {decision.decision} recorded",
62 "approval_id": decision.approval_id
63 }
64
65@app.get("/approvals/pending")
66async def get_pending_approvals(approver_email: str):
67 # Get all pending approvals for an approver
68 pending_approvals = get_pending_approvals_for_user(approver_email)
69
70 return {
71 "pending_approvals": pending_approvals,
72 "count": len(pending_approvals)
73 }Perfect balance of automation and human control!
Now your workflows automatically handle routine tasks while seamlessly pausing for human judgment when needed. Approvers get notified instantly, decisions are tracked reliably, and workflows resume exactly where they left off - no manual coordination required.
Enhance further with approval hierarchies, conditional routing based on request values, bulk approval interfaces, or integration with tools like Slack for in-context decision making.
Explore other use cases
Ecommerce Checkout Flows
Handle payment processing, inventory updates, and order fulfillment reliably.
User Onboarding
Orchestrate multi-step user setup flows with email notifications and approvals.
Background Tasks
Schedule and run long-running jobs with automatic retries and monitoring.
Agent Actions
Execute AI agent actions as durable, observable tasks.