Ecommerce Checkout Flow with Hyrex

Checkout flows are critical but complex - payment processing, inventory management, order creation, and email notifications must work perfectly together. One failure can leave customers charged but without orders, or worse. How do you make checkout bulletproof?

With Hyrex, you can orchestrate reliable checkout workflows that handle payment processing, inventory reservation, order creation, and notifications as atomic operations. Built-in error handling ensures failed payments are rolled back and customers are notified appropriately.

Ecommerce Checkout Flow

Step 1: Define a Checkout Workflow

Wrap your existing tasks in an explicit Hyrex workflow to get a named, triggerable checkout with clear fan-in/fan-out, durable roots, and centralized failure handling. This makes it easy to trigger from your app or on a schedule, and to inspect runs in the workflow UI.

src/hyrex/checkout_workflow.py
1from hyrex import HyrexRegistry, get_hyrex_workflow_context
2from pydantic import BaseModel
3from typing import Optional, Dict, Any
4import stripe, os
5from decimal import Decimal
6
7hy = HyrexRegistry()
8
9class CheckoutArgs(BaseModel):
10    cart_id: str
11    customer_id: str
12    customer_email: str
13    payment_method_id: str
14    shipping_address: Dict[str, Any]
15    billing_address: Dict[str, Any]
16    return_url: Optional[str] = None
17
18@hy.task
19def validate_cart_items():
20    """Validate cart items and calculate totals. Reads workflow args."""
21    ctx = get_hyrex_workflow_context()
22    cart_id = ctx.workflow_args.cart_id if ctx and ctx.workflow_args else None
23    cart = get_cart_from_db(cart_id)
24
25    total_amount = Decimal('0.00')
26    validated_items = []
27    for item in cart['items']:
28        product = get_product_by_id(item['product_id'])
29        if product['stock_quantity'] < item['quantity']:
30            raise Exception(f"Insufficient stock for {product['name']}")
31        item_total = product['price'] * item['quantity']
32        total_amount += item_total
33        validated_items.append({
34            'product_id': item['product_id'],
35            'quantity': item['quantity'],
36            'unit_price': product['price'],
37            'total_price': item_total
38        })
39
40    return {
41        'items': validated_items,
42        'subtotal': float(total_amount),
43        'tax_amount': float(calculate_tax(total_amount, cart['shipping_address'])),
44        'shipping_cost': float(calculate_shipping(cart['shipping_address']))
45    }
46
47@hy.task
48def reserve_inventory():
49    """Reserve inventory using result from validate_cart_items."""
50    ctx = get_hyrex_workflow_context()
51    prev = ctx.durable_runs.get('validate_cart_items') if ctx else None
52    if prev:
53        prev.refresh()
54        validation = prev.result or {}
55    else:
56        validation = {}
57
58    for item in validation.get('items', []):
59        success = reserve_product_stock(item['product_id'], item['quantity'])
60        if not success:
61            raise Exception(f"Could not reserve stock for product {item['product_id']}")
62    return { 'reserved': True }
63
64@hy.task
65def process_payment():
66    """Process payment using validation totals and workflow args."""
67    ctx = get_hyrex_workflow_context()
68    prev = ctx.durable_runs.get('validate_cart_items') if ctx else None
69    if prev:
70        prev.refresh()
71        validation = prev.result or {}
72    else:
73        validation = {}
74    args = ctx.workflow_args if ctx else None
75
76    amount = validation.get('subtotal', 0) + validation.get('tax_amount', 0) + validation.get('shipping_cost', 0)
77    stripe.api_key = os.environ.get('STRIPE_SECRET_KEY')
78    payment_intent = stripe.PaymentIntent.create(
79        amount=int(amount * 100),
80        currency='usd',
81        payment_method=args.payment_method_id,
82        confirm=True,
83        return_url=args.return_url if args else None
84    )
85    return { 'payment_intent_id': payment_intent.id, 'status': payment_intent.status, 'amount': amount }
86
87@hy.task
88def create_order():
89    """Create order using validation + payment results and workflow args."""
90    ctx = get_hyrex_workflow_context()
91    args = ctx.workflow_args if ctx else None
92
93    val_run = ctx.durable_runs.get('validate_cart_items') if ctx else None
94    pay_run = ctx.durable_runs.get('process_payment') if ctx else None
95    validation = (val_run.refresh() or True) and val_run.result if val_run else {}
96    payment = (pay_run.refresh() or True) and pay_run.result if pay_run else {}
97
98    order = {
99        'customer_id': args.customer_id,
100        'items': validation.get('items', []),
101        'subtotal': validation.get('subtotal', 0),
102        'tax_amount': validation.get('tax_amount', 0),
103        'shipping_cost': validation.get('shipping_cost', 0),
104        'total_amount': payment.get('amount', 0),
105        'shipping_address': args.shipping_address,
106        'billing_address': args.billing_address,
107        'payment_intent_id': payment.get('payment_intent_id'),
108        'status': 'confirmed'
109    }
110    order_id = save_order_to_db(order)
111    return { 'order_id': order_id, 'order': order }
112
113@hy.task
114def send_order_confirmation():
115    ctx = get_hyrex_workflow_context()
116    args = ctx.workflow_args if ctx else None
117    order_run = ctx.durable_runs.get('create_order') if ctx else None
118    order_run.refresh() if order_run else None
119    order = (order_run.result or {}).get('order') if order_run else None
120
121    if not order:
122        raise Exception('Order not found')
123
124    # Build email
125    email_content = f"Order #: {order.get('payment_intent_id')}
126Total: ${order.get('total_amount'):.2f}
127"
128    for item in order.get('items', []):
129        product = get_product_by_id(item['product_id'])
130        email_content += f"- {product['name']} x {item['quantity']}: ${item['total_price']:.2f}
131"
132    send_email(to=args.customer_email, subject=f"Order Confirmation", body=email_content)
133    return { 'sent': True }
134
135@hy.task
136def check_fraud():
137    """Simple fraud check; raise if risk too high."""
138    risk = 0.02  # stubbed risk score
139    if risk >= 0.5:
140        raise Exception('High fraud risk')
141    return { 'cleared': True, 'risk': risk }
142
143@hy.task
144def create_shipping_label():
145    """Create a shipping label after order creation."""
146    ctx = get_hyrex_workflow_context()
147    args = ctx.workflow_args if ctx else None
148    order_run = ctx.durable_runs.get('create_order') if ctx else None
149    order_run.refresh() if order_run else None
150    order = (order_run.result or {}).get('order') if order_run else None
151    if not order:
152        raise Exception('Order not found')
153    label = {
154        'carrier': 'UPS',
155        'tracking_number': '1Z' + str(int(order.get('total_amount', 0) * 1000))[:8],
156        'address': args.shipping_address if args else None
157    }
158    return label
159
160@hy.task
161def notify_warehouse():
162    """Notify warehouse to pick and pack once label is created."""
163    ctx = get_hyrex_workflow_context()
164    order_run = ctx.durable_runs.get('create_order') if ctx else None
165    label_run = ctx.durable_runs.get('create_shipping_label') if ctx else None
166    order_run.refresh() if order_run else None
167    label_run.refresh() if label_run else None
168    order = (order_run.result or {}).get('order') if order_run else None
169    label = label_run.result if label_run else None
170    enqueue_warehouse_pick(order, label)  # stub integration
171    return { 'notified': True }
172
173@hy.workflow(workflow_arg_schema=CheckoutArgs)
174def checkout_workflow():
175    # Validate, then run inventory reservation and fraud check in parallel
176    validate_cart_items >> [reserve_inventory, check_fraud] >> process_payment >> create_order >> [send_order_confirmation, create_shipping_label]
177    # Branch from label creation to warehouse notification
178    create_shipping_label >> notify_warehouse

Step 2: Trigger and Track the Workflow

Expose APIs that trigger the named workflow and report run status. Clients receive a workflow run ID to poll for completion while the durable workflow coordinates payment, inventory, and order creation.

src/routes/checkout_workflow_api.py
1from fastapi import FastAPI
2from pydantic import BaseModel
3from .workflows import checkout_workflow
4
5app = FastAPI()
6
7class CheckoutRequest(BaseModel):
8    cart_id: str
9    customer_id: str
10    customer_email: str
11    payment_method_id: str
12    shipping_address: dict
13    billing_address: dict
14    return_url: str | None = None
15
16@app.post("/checkout/process")
17async def initiate_checkout(request: CheckoutRequest):
18    checkout_data = request.model_dump()
19    run = checkout_workflow.send(checkout_data)
20    return {"message": "Checkout initiated", "workflow_run_id": run.id, "cart_id": request.cart_id}

Reliable checkout that never loses money!

Workflows give you explicit DAGs, durable roots, and clear run histories. Failed payments are handled gracefully, inventory is managed accurately, and order confirmation emails are sent consistently—without manual stitching.

Scale further with fraud detection, multi-currency support, subscription billing, or complex pricing rules—all orchestrated through reliable Hyrex workflows.