Ecommerce Checkout Flow with Hyrex
Checkout flows are critical but complex - payment processing, inventory management, order creation, and email notifications must work perfectly together. One failure can leave customers charged but without orders, or worse. How do you make checkout bulletproof?
With Hyrex, you can orchestrate reliable checkout workflows that handle payment processing, inventory reservation, order creation, and notifications as atomic operations. Built-in error handling ensures failed payments are rolled back and customers are notified appropriately.
Step 1: Define a Checkout Workflow
Wrap your existing tasks in an explicit Hyrex workflow to get a named, triggerable checkout with clear fan-in/fan-out, durable roots, and centralized failure handling. This makes it easy to trigger from your app or on a schedule, and to inspect runs in the workflow UI.
1from hyrex import HyrexRegistry, get_hyrex_workflow_context
2from pydantic import BaseModel
3from typing import Optional, Dict, Any
4import stripe, os
5from decimal import Decimal
6
7hy = HyrexRegistry()
8
9class CheckoutArgs(BaseModel):
10 cart_id: str
11 customer_id: str
12 customer_email: str
13 payment_method_id: str
14 shipping_address: Dict[str, Any]
15 billing_address: Dict[str, Any]
16 return_url: Optional[str] = None
17
18@hy.task
19def validate_cart_items():
20 """Validate cart items and calculate totals. Reads workflow args."""
21 ctx = get_hyrex_workflow_context()
22 cart_id = ctx.workflow_args.cart_id if ctx and ctx.workflow_args else None
23 cart = get_cart_from_db(cart_id)
24
25 total_amount = Decimal('0.00')
26 validated_items = []
27 for item in cart['items']:
28 product = get_product_by_id(item['product_id'])
29 if product['stock_quantity'] < item['quantity']:
30 raise Exception(f"Insufficient stock for {product['name']}")
31 item_total = product['price'] * item['quantity']
32 total_amount += item_total
33 validated_items.append({
34 'product_id': item['product_id'],
35 'quantity': item['quantity'],
36 'unit_price': product['price'],
37 'total_price': item_total
38 })
39
40 return {
41 'items': validated_items,
42 'subtotal': float(total_amount),
43 'tax_amount': float(calculate_tax(total_amount, cart['shipping_address'])),
44 'shipping_cost': float(calculate_shipping(cart['shipping_address']))
45 }
46
47@hy.task
48def reserve_inventory():
49 """Reserve inventory using result from validate_cart_items."""
50 ctx = get_hyrex_workflow_context()
51 prev = ctx.durable_runs.get('validate_cart_items') if ctx else None
52 if prev:
53 prev.refresh()
54 validation = prev.result or {}
55 else:
56 validation = {}
57
58 for item in validation.get('items', []):
59 success = reserve_product_stock(item['product_id'], item['quantity'])
60 if not success:
61 raise Exception(f"Could not reserve stock for product {item['product_id']}")
62 return { 'reserved': True }
63
64@hy.task
65def process_payment():
66 """Process payment using validation totals and workflow args."""
67 ctx = get_hyrex_workflow_context()
68 prev = ctx.durable_runs.get('validate_cart_items') if ctx else None
69 if prev:
70 prev.refresh()
71 validation = prev.result or {}
72 else:
73 validation = {}
74 args = ctx.workflow_args if ctx else None
75
76 amount = validation.get('subtotal', 0) + validation.get('tax_amount', 0) + validation.get('shipping_cost', 0)
77 stripe.api_key = os.environ.get('STRIPE_SECRET_KEY')
78 payment_intent = stripe.PaymentIntent.create(
79 amount=int(amount * 100),
80 currency='usd',
81 payment_method=args.payment_method_id,
82 confirm=True,
83 return_url=args.return_url if args else None
84 )
85 return { 'payment_intent_id': payment_intent.id, 'status': payment_intent.status, 'amount': amount }
86
87@hy.task
88def create_order():
89 """Create order using validation + payment results and workflow args."""
90 ctx = get_hyrex_workflow_context()
91 args = ctx.workflow_args if ctx else None
92
93 val_run = ctx.durable_runs.get('validate_cart_items') if ctx else None
94 pay_run = ctx.durable_runs.get('process_payment') if ctx else None
95 validation = (val_run.refresh() or True) and val_run.result if val_run else {}
96 payment = (pay_run.refresh() or True) and pay_run.result if pay_run else {}
97
98 order = {
99 'customer_id': args.customer_id,
100 'items': validation.get('items', []),
101 'subtotal': validation.get('subtotal', 0),
102 'tax_amount': validation.get('tax_amount', 0),
103 'shipping_cost': validation.get('shipping_cost', 0),
104 'total_amount': payment.get('amount', 0),
105 'shipping_address': args.shipping_address,
106 'billing_address': args.billing_address,
107 'payment_intent_id': payment.get('payment_intent_id'),
108 'status': 'confirmed'
109 }
110 order_id = save_order_to_db(order)
111 return { 'order_id': order_id, 'order': order }
112
113@hy.task
114def send_order_confirmation():
115 ctx = get_hyrex_workflow_context()
116 args = ctx.workflow_args if ctx else None
117 order_run = ctx.durable_runs.get('create_order') if ctx else None
118 order_run.refresh() if order_run else None
119 order = (order_run.result or {}).get('order') if order_run else None
120
121 if not order:
122 raise Exception('Order not found')
123
124 # Build email
125 email_content = f"Order #: {order.get('payment_intent_id')}
126Total: ${order.get('total_amount'):.2f}
127"
128 for item in order.get('items', []):
129 product = get_product_by_id(item['product_id'])
130 email_content += f"- {product['name']} x {item['quantity']}: ${item['total_price']:.2f}
131"
132 send_email(to=args.customer_email, subject=f"Order Confirmation", body=email_content)
133 return { 'sent': True }
134
135@hy.task
136def check_fraud():
137 """Simple fraud check; raise if risk too high."""
138 risk = 0.02 # stubbed risk score
139 if risk >= 0.5:
140 raise Exception('High fraud risk')
141 return { 'cleared': True, 'risk': risk }
142
143@hy.task
144def create_shipping_label():
145 """Create a shipping label after order creation."""
146 ctx = get_hyrex_workflow_context()
147 args = ctx.workflow_args if ctx else None
148 order_run = ctx.durable_runs.get('create_order') if ctx else None
149 order_run.refresh() if order_run else None
150 order = (order_run.result or {}).get('order') if order_run else None
151 if not order:
152 raise Exception('Order not found')
153 label = {
154 'carrier': 'UPS',
155 'tracking_number': '1Z' + str(int(order.get('total_amount', 0) * 1000))[:8],
156 'address': args.shipping_address if args else None
157 }
158 return label
159
160@hy.task
161def notify_warehouse():
162 """Notify warehouse to pick and pack once label is created."""
163 ctx = get_hyrex_workflow_context()
164 order_run = ctx.durable_runs.get('create_order') if ctx else None
165 label_run = ctx.durable_runs.get('create_shipping_label') if ctx else None
166 order_run.refresh() if order_run else None
167 label_run.refresh() if label_run else None
168 order = (order_run.result or {}).get('order') if order_run else None
169 label = label_run.result if label_run else None
170 enqueue_warehouse_pick(order, label) # stub integration
171 return { 'notified': True }
172
173@hy.workflow(workflow_arg_schema=CheckoutArgs)
174def checkout_workflow():
175 # Validate, then run inventory reservation and fraud check in parallel
176 validate_cart_items >> [reserve_inventory, check_fraud] >> process_payment >> create_order >> [send_order_confirmation, create_shipping_label]
177 # Branch from label creation to warehouse notification
178 create_shipping_label >> notify_warehouse
Step 2: Trigger and Track the Workflow
Expose APIs that trigger the named workflow and report run status. Clients receive a workflow run ID to poll for completion while the durable workflow coordinates payment, inventory, and order creation.
1from fastapi import FastAPI
2from pydantic import BaseModel
3from .workflows import checkout_workflow
4
5app = FastAPI()
6
7class CheckoutRequest(BaseModel):
8 cart_id: str
9 customer_id: str
10 customer_email: str
11 payment_method_id: str
12 shipping_address: dict
13 billing_address: dict
14 return_url: str | None = None
15
16@app.post("/checkout/process")
17async def initiate_checkout(request: CheckoutRequest):
18 checkout_data = request.model_dump()
19 run = checkout_workflow.send(checkout_data)
20 return {"message": "Checkout initiated", "workflow_run_id": run.id, "cart_id": request.cart_id}
Reliable checkout that never loses money!
Workflows give you explicit DAGs, durable roots, and clear run histories. Failed payments are handled gracefully, inventory is managed accurately, and order confirmation emails are sent consistently—without manual stitching.
Scale further with fraud detection, multi-currency support, subscription billing, or complex pricing rules—all orchestrated through reliable Hyrex workflows.
Explore other use cases
Background Tasks
Schedule and run long-running jobs with automatic retries and monitoring.
Human-in-the-Loop
Integrate human approval steps with pause/resume workflows.
User Onboarding
Orchestrate multi-step user setup flows with email notifications and approvals.
Context Engineering
Orchestrate LLM context preparation with parallel processing.