Overview
This example demonstrates a complete checkout workflow:- Cart validation with real-time pricing and inventory checks
- Payment processing through Stripe with error handling
- Inventory reservation with rollback on failures
- Order creation and confirmation emails
- Failure handling with proper cleanup and customer notifications
- Orchestrated workflows ensuring data consistency
Task Definitions
Copy
Ask AI
from hyrex import HyrexRegistry
import stripe
import requests
from decimal import Decimal
import os
hy = HyrexRegistry()
@hy.task
def validate_cart_items(cart_id: str):
"""Validate cart items and calculate totals"""
cart = get_cart_from_db(cart_id)
total_amount = Decimal('0.00')
validated_items = []
for item in cart['items']:
product = get_product_by_id(item['product_id'])
# Check stock availability
if product['stock_quantity'] < item['quantity']:
raise Exception(f"Insufficient stock for {product['name']}")
# Validate pricing
item_total = product['price'] * item['quantity']
total_amount += item_total
validated_items.append({
'product_id': item['product_id'],
'quantity': item['quantity'],
'unit_price': product['price'],
'total_price': item_total
})
return {
'cart_id': cart_id,
'items': validated_items,
'subtotal': total_amount,
'tax_amount': calculate_tax(total_amount, cart['shipping_address']),
'shipping_cost': calculate_shipping(cart['shipping_address'])
}
@hy.task
def process_payment(payment_data: dict):
"""Process payment through Stripe"""
stripe.api_key = os.environ.get('STRIPE_SECRET_KEY')
try:
payment_intent = stripe.PaymentIntent.create(
amount=int(payment_data['amount'] * 100), # Convert to cents
currency='usd',
payment_method=payment_data['payment_method_id'],
confirm=True,
return_url=payment_data.get('return_url')
)
return {
'payment_intent_id': payment_intent.id,
'status': payment_intent.status,
'amount': payment_data['amount']
}
except stripe.error.CardError as e:
raise Exception(f"Payment failed: {e.user_message}")
@hy.task
def reserve_inventory(cart_items: list):
"""Reserve inventory for cart items"""
reserved_items = []
for item in cart_items:
success = reserve_product_stock(
item['product_id'],
item['quantity']
)
if not success:
# Rollback previous reservations
for reserved in reserved_items:
release_product_stock(
reserved['product_id'],
reserved['quantity']
)
raise Exception(f"Could not reserve stock for product {item['product_id']}")
reserved_items.append(item)
return reserved_items
@hy.task
def create_order(order_data: dict):
"""Create order record in database"""
order = {
'customer_id': order_data['customer_id'],
'items': order_data['items'],
'subtotal': order_data['subtotal'],
'tax_amount': order_data['tax_amount'],
'shipping_cost': order_data['shipping_cost'],
'total_amount': order_data['total_amount'],
'shipping_address': order_data['shipping_address'],
'billing_address': order_data['billing_address'],
'payment_intent_id': order_data['payment_intent_id'],
'status': 'confirmed'
}
order_id = save_order_to_db(order)
return {'order_id': order_id, 'order': order}
@hy.task
def send_order_confirmation(order_id: str, customer_email: str):
"""Send order confirmation email"""
order = get_order_by_id(order_id)
email_content = f"""
Thank you for your order!
Order #: {order_id}
Total: ${order['total_amount']:.2f}
Items:
"""
for item in order['items']:
product = get_product_by_id(item['product_id'])
email_content += f"- {product['name']} x {item['quantity']}: ${item['total_price']:.2f}\\n"
email_content += f"""
Shipping Address:
{format_address(order['shipping_address'])}
We'll send you a tracking number when your order ships!
"""
send_email(
to=customer_email,
subject=f"Order Confirmation #{order_id}",
body=email_content
)
@hy.task
def process_checkout(checkout_data: dict):
"""Orchestrate the complete checkout process"""
try:
# Step 1: Validate cart and calculate totals
validation_result = validate_cart_items.send(checkout_data['cart_id']).get()
# Step 2: Reserve inventory
reserve_inventory.send(validation_result['items']).get()
# Step 3: Process payment
payment_data = {
'amount': validation_result['subtotal'] + validation_result['tax_amount'] + validation_result['shipping_cost'],
'payment_method_id': checkout_data['payment_method_id'],
'return_url': checkout_data.get('return_url')
}
payment_result = process_payment.send(payment_data).get()
# Step 4: Create order
order_data = {
**validation_result,
'customer_id': checkout_data['customer_id'],
'shipping_address': checkout_data['shipping_address'],
'billing_address': checkout_data['billing_address'],
'payment_intent_id': payment_result['payment_intent_id'],
'total_amount': payment_data['amount']
}
order_result = create_order.send(order_data).get()
# Step 5: Send confirmation email
send_order_confirmation.send(order_result['order_id'], checkout_data['customer_email'])
return {
'success': True,
'order_id': order_result['order_id'],
'payment_status': payment_result['status']
}
except Exception as e:
# Handle failures - rollback inventory, refund payment if necessary
handle_checkout_failure.send(checkout_data, str(e))
raise e
@hy.task
def handle_checkout_failure(checkout_data: dict, error_message: str):
"""Handle checkout failures with proper cleanup"""
# Log the error
log_checkout_error(checkout_data['cart_id'], error_message)
# Send failure notification to customer
send_email(
to=checkout_data['customer_email'],
subject="Order Processing Issue",
body=f"We encountered an issue processing your order. Error: {error_message}. Please try again or contact support."
)
REST API Endpoints
Copy
Ask AI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from .tasks import process_checkout, process_refund
app = FastAPI()
class CheckoutRequest(BaseModel):
cart_id: str
customer_id: str
customer_email: str
payment_method_id: str
shipping_address: dict
billing_address: dict
return_url: str = None
class RefundRequest(BaseModel):
order_id: str
reason: str = None
amount: float = None # Partial refund amount
@app.post("/checkout/process")
async def initiate_checkout(request: CheckoutRequest):
"""Start the checkout process"""
checkout_data = {
'cart_id': request.cart_id,
'customer_id': request.customer_id,
'customer_email': request.customer_email,
'payment_method_id': request.payment_method_id,
'shipping_address': request.shipping_address,
'billing_address': request.billing_address,
'return_url': request.return_url
}
task = process_checkout.send(checkout_data)
return {
"message": "Checkout initiated",
"task_id": task.task_id,
"cart_id": request.cart_id
}
@app.get("/checkout/status/{task_id}")
async def get_checkout_status(task_id: str):
"""Get checkout processing status"""
task = hy.get_task(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.is_complete else None,
"error": task.error if task.has_failed else None
}
@app.post("/orders/refund")
async def process_refund_endpoint(request: RefundRequest):
"""Process order refund"""
task = process_refund.send(request.order_id, request.amount, request.reason)
return {
"message": "Refund processing started",
"task_id": task.task_id,
"order_id": request.order_id
}
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
"""Get order details"""
order = get_order_by_id(order_id)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
return order
Advanced Checkout Features
Refund Processing
Copy
Ask AI
@hy.task
def process_refund(order_id: str, refund_amount: float = None, reason: str = None):
"""Process full or partial refund"""
order = get_order_by_id(order_id)
if not order:
raise Exception("Order not found")
# Determine refund amount
if refund_amount is None:
refund_amount = order['total_amount']
# Process refund through Stripe
stripe.api_key = os.environ.get('STRIPE_SECRET_KEY')
try:
refund = stripe.Refund.create(
payment_intent=order['payment_intent_id'],
amount=int(refund_amount * 100), # Convert to cents
reason=reason or 'requested_by_customer'
)
# Update order status
update_order_status(order_id, 'refunded', {
'refund_id': refund.id,
'refund_amount': refund_amount,
'refund_reason': reason
})
# Restore inventory if full refund
if refund_amount == order['total_amount']:
for item in order['items']:
release_product_stock(item['product_id'], item['quantity'])
# Send refund confirmation email
send_refund_confirmation.send(order_id, order['customer_email'], refund_amount)
return {
'order_id': order_id,
'refund_id': refund.id,
'refund_amount': refund_amount,
'status': refund.status
}
except stripe.error.StripeError as e:
raise Exception(f"Refund failed: {str(e)}")
Subscription Checkout
Copy
Ask AI
@hy.task
def process_subscription_checkout(subscription_data: dict):
"""Handle recurring subscription checkout"""
try:
# Create customer if not exists
customer = create_or_get_stripe_customer(
subscription_data['customer_email'],
subscription_data['payment_method_id']
)
# Create subscription
subscription = stripe.Subscription.create(
customer=customer.id,
items=[{
'price': subscription_data['price_id'],
'quantity': subscription_data.get('quantity', 1)
}],
payment_behavior='default_incomplete',
expand=['latest_invoice.payment_intent']
)
# Save subscription to database
subscription_id = save_subscription_to_db({
'stripe_subscription_id': subscription.id,
'customer_id': subscription_data['customer_id'],
'price_id': subscription_data['price_id'],
'status': subscription.status
})
return {
'subscription_id': subscription_id,
'stripe_subscription_id': subscription.id,
'client_secret': subscription.latest_invoice.payment_intent.client_secret,
'status': subscription.status
}
except stripe.error.StripeError as e:
raise Exception(f"Subscription creation failed: {str(e)}")
Usage Examples
Copy
Ask AI
# Process checkout
curl -X POST http://localhost:8000/checkout/process \
-H "Content-Type: application/json" \
-d '{
"cart_id": "cart_123",
"customer_id": "cust_456",
"customer_email": "customer@example.com",
"payment_method_id": "pm_card_visa",
"shipping_address": {
"name": "John Doe",
"street": "123 Main St",
"city": "San Francisco",
"state": "CA",
"zip": "94105"
},
"billing_address": {
"name": "John Doe",
"street": "123 Main St",
"city": "San Francisco",
"state": "CA",
"zip": "94105"
}
}'
# Check checkout status
curl http://localhost:8000/checkout/status/task_12345
# Process refund
curl -X POST http://localhost:8000/orders/refund \
-H "Content-Type: application/json" \
-d '{
"order_id": "order_789",
"reason": "Customer requested cancellation",
"amount": 49.99
}'
Error Handling Patterns
Idempotent Operations
Copy
Ask AI
@hy.task
def idempotent_checkout(checkout_data: dict):
"""Ensure checkout can be retried safely"""
idempotency_key = f"checkout_{checkout_data['cart_id']}_{hash(str(checkout_data))}"
# Check if already processed
existing_order = get_order_by_idempotency_key(idempotency_key)
if existing_order:
return {
'success': True,
'order_id': existing_order['id'],
'already_processed': True
}
# Process with idempotency key
result = process_checkout_with_key(checkout_data, idempotency_key)
return result
Circuit Breaker Pattern
Copy
Ask AI
from functools import wraps
import time
def circuit_breaker(failure_threshold=5, recovery_timeout=60):
def decorator(func):
func.failure_count = 0
func.last_failure_time = None
func.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
@wraps(func)
def wrapper(*args, **kwargs):
if func.state == 'OPEN':
if time.time() - func.last_failure_time > recovery_timeout:
func.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
if func.state == 'HALF_OPEN':
func.state = 'CLOSED'
func.failure_count = 0
return result
except Exception as e:
func.failure_count += 1
func.last_failure_time = time.time()
if func.failure_count >= failure_threshold:
func.state = 'OPEN'
raise e
return wrapper
return decorator
@hy.task
@circuit_breaker(failure_threshold=3, recovery_timeout=30)
def stripe_payment_with_circuit_breaker(payment_data: dict):
"""Payment processing with circuit breaker protection"""
return process_payment(payment_data)
Production Considerations
- Payment security: Use Stripe’s secure payment handling and never store card details
- Inventory consistency: Implement proper locking mechanisms for stock reservations
- Idempotency: Ensure checkout operations can be retried safely
- Monitoring: Track checkout conversion rates and failure patterns
- PCI compliance: Follow PCI DSS requirements for payment data handling
- Tax calculation: Integrate with tax services for accurate calculations