Human-in-the-Loop with Hyrex

Some decisions are too important for automation alone. Expense approvals, production deployments, content moderation - these critical processes need human judgment. But how do you seamlessly blend automated workflows with human decision-making?

With Hyrex, you can build workflows that pause for human input, send notifications to the right people, wait for decisions with timeouts, and gracefully handle approvals or rejections. Turn complex approval processes into reliable, trackable workflows that never lose track of pending decisions.

Human-in-the-Loop Workflow

Step 1: Define Approval Workflows

Create Hyrex tasks that handle the complete approval lifecycle - validation, notification, waiting for human decisions, and processing the results. These workflows can include timeouts, escalation rules, and multiple approval stages.

src/hyrex/approval_tasks.py
1from hyrex import HyrexRegistry
2import time
3from enum import Enum
4
5hy = HyrexRegistry()
6
7class ApprovalStatus(Enum):
8    PENDING = "pending"
9    APPROVED = "approved"
10    REJECTED = "rejected"
11
12@hy.task
13def send_approval_notification(approver_email: str, task_data: dict, approval_id: str):
14    """Send email notification to human approver"""
15    email_content = f"""
16    New task requires your approval:
17
18    Task: {task_data['title']}
19    Description: {task_data['description']}
20    Requested by: {task_data['requester']}
21
22    Approve: https://yourapp.com/approvals/{approval_id}/approve
23    Reject: https://yourapp.com/approvals/{approval_id}/reject
24    """
25
26    # Send email using your preferred service
27    send_email(
28        to=approver_email,
29        subject="Task Approval Required",
30        body=email_content
31    )
32
33@hy.task
34def wait_for_human_approval(approval_id: str, timeout_hours: int = 24) -> str:
35    """Wait for human approval with timeout"""
36    start_time = time.time()
37    timeout_seconds = timeout_hours * 3600
38
39    while time.time() - start_time < timeout_seconds:
40        # Check approval status in database
41        approval = get_approval_status(approval_id)
42
43        if approval.status == ApprovalStatus.APPROVED:
44            return "approved"
45        elif approval.status == ApprovalStatus.REJECTED:
46            return "rejected"
47
48        # Wait before checking again
49        time.sleep(30)
50
51    # Timeout reached
52    return "timeout"
53
54@hy.task
55def process_expense_report(expense_data: dict, approver_email: str):
56    """Process expense report with human approval"""
57
58    # Step 1: Validate expense data
59    validation_result = validate_expense_data(expense_data)
60    if not validation_result.is_valid:
61        raise Exception(f"Invalid expense data: {validation_result.errors}")
62
63    # Step 2: Check if approval is needed (e.g., amount > $500)
64    if expense_data["amount"] > 500:
65        approval_id = create_approval_record(expense_data)
66
67        # Send notification to approver
68        send_approval_notification.send(
69            approver_email,
70            expense_data,
71            approval_id
72        )
73
74        # Wait for human decision
75        approval_result = wait_for_human_approval.send(approval_id, 48).get()
76
77        if approval_result == "rejected":
78            update_expense_status(expense_data["id"], "rejected")
79            send_rejection_notification(expense_data["submitter_email"])
80            return {"status": "rejected", "reason": "Manager approval denied"}
81        elif approval_result == "timeout":
82            escalate_approval(approval_id, expense_data)
83            return {"status": "escalated", "reason": "Approval timeout"}
84
85    # Step 3: Process approved expense
86    process_payment(expense_data)
87    update_expense_status(expense_data["id"], "processed")
88    send_confirmation(expense_data["submitter_email"])
89
90    return {"status": "processed", "amount": expense_data["amount"]}
91
92@hy.task
93def deploy_with_approval(service_name: str, version: str, approver_email: str):
94    """Deploy service after human approval"""
95
96    deployment_data = {
97        "title": f"Deploy {service_name} v{version}",
98        "description": f"Deploy {service_name} version {version} to production",
99        "service": service_name,
100        "version": version,
101        "requester": "CI/CD Pipeline"
102    }
103
104    # Create approval request
105    approval_id = create_approval_record(deployment_data)
106
107    # Notify approver
108    send_approval_notification.send(
109        approver_email,
110        deployment_data,
111        approval_id
112    )
113
114    # Wait for approval
115    approval_result = wait_for_human_approval.send(approval_id, 2).get()  # 2 hour timeout
116
117    if approval_result == "approved":
118        # Proceed with deployment
119        deploy_service(service_name, version)
120        return {"status": "deployed", "service": service_name, "version": version}
121    else:
122        return {"status": "deployment_cancelled", "reason": approval_result}

Step 2: Build Approval APIs and Interfaces

Create endpoints that trigger approval workflows and allow humans to make decisions. These APIs handle the submission of requests, tracking pending approvals, and recording human decisions that resume the automated workflows.

src/routes/approval_api.py
1from fastapi import FastAPI
2from pydantic import BaseModel
3from .tasks import process_expense_report, deploy_with_approval
4
5app = FastAPI()
6
7class ExpenseRequest(BaseModel):
8    expense_data: dict
9    approver_email: str
10
11class DeploymentRequest(BaseModel):
12    service_name: str
13    version: str
14    approver_email: str
15
16class ApprovalDecision(BaseModel):
17    approval_id: str
18    decision: str  # "approve" or "reject"
19    comment: str = None
20
21@app.post("/expenses/submit")
22async def submit_expense(request: ExpenseRequest):
23    # Submit expense for processing with human approval
24    task = process_expense_report.send(
25        request.expense_data,
26        request.approver_email
27    )
28
29    return {
30        "message": "Expense submitted for approval",
31        "task_id": task.task_id,
32        "expense_id": request.expense_data.get("id")
33    }
34
35@app.post("/deployments/request")
36async def request_deployment(request: DeploymentRequest):
37    # Request deployment with approval
38    task = deploy_with_approval.send(
39        request.service_name,
40        request.version,
41        request.approver_email
42    )
43
44    return {
45        "message": "Deployment requested, awaiting approval",
46        "task_id": task.task_id,
47        "service": request.service_name,
48        "version": request.version
49    }
50
51@app.post("/approvals/decide")
52async def make_approval_decision(decision: ApprovalDecision):
53    # Record human approval/rejection decision
54    update_approval_status(
55        decision.approval_id,
56        decision.decision,
57        decision.comment
58    )
59
60    return {
61        "message": f"Approval {decision.decision} recorded",
62        "approval_id": decision.approval_id
63    }
64
65@app.get("/approvals/pending")
66async def get_pending_approvals(approver_email: str):
67    # Get all pending approvals for an approver
68    pending_approvals = get_pending_approvals_for_user(approver_email)
69
70    return {
71        "pending_approvals": pending_approvals,
72        "count": len(pending_approvals)
73    }

Perfect balance of automation and human control!

Now your workflows automatically handle routine tasks while seamlessly pausing for human judgment when needed. Approvers get notified instantly, decisions are tracked reliably, and workflows resume exactly where they left off - no manual coordination required.

Enhance further with approval hierarchies, conditional routing based on request values, bulk approval interfaces, or integration with tools like Slack for in-context decision making.