Build a robust e-commerce checkout system with payment processing, inventory management, and order fulfillment using Stripe integration and transactional workflows.

Overview

This example demonstrates a complete checkout workflow:
  • Cart validation with real-time pricing and inventory checks
  • Payment processing through Stripe with error handling
  • Inventory reservation with rollback on failures
  • Order creation and confirmation emails
  • Failure handling with proper cleanup and customer notifications
  • Orchestrated workflows ensuring data consistency
Perfect for e-commerce platforms, subscription services, or any application requiring reliable payment processing.

Task Definitions

from hyrex import HyrexRegistry
import stripe
import requests
from decimal import Decimal
import os

hy = HyrexRegistry()

@hy.task
def validate_cart_items(cart_id: str):
    """Validate cart items and calculate totals"""
    cart = get_cart_from_db(cart_id)
    
    total_amount = Decimal('0.00')
    validated_items = []
    
    for item in cart['items']:
        product = get_product_by_id(item['product_id'])
        
        # Check stock availability
        if product['stock_quantity'] < item['quantity']:
            raise Exception(f"Insufficient stock for {product['name']}")
        
        # Validate pricing
        item_total = product['price'] * item['quantity']
        total_amount += item_total
        
        validated_items.append({
            'product_id': item['product_id'],
            'quantity': item['quantity'],
            'unit_price': product['price'],
            'total_price': item_total
        })
    
    return {
        'cart_id': cart_id,
        'items': validated_items,
        'subtotal': total_amount,
        'tax_amount': calculate_tax(total_amount, cart['shipping_address']),
        'shipping_cost': calculate_shipping(cart['shipping_address'])
    }

@hy.task
def process_payment(payment_data: dict):
    """Process payment through Stripe"""
    stripe.api_key = os.environ.get('STRIPE_SECRET_KEY')
    
    try:
        payment_intent = stripe.PaymentIntent.create(
            amount=int(payment_data['amount'] * 100),  # Convert to cents
            currency='usd',
            payment_method=payment_data['payment_method_id'],
            confirm=True,
            return_url=payment_data.get('return_url')
        )
        
        return {
            'payment_intent_id': payment_intent.id,
            'status': payment_intent.status,
            'amount': payment_data['amount']
        }
    except stripe.error.CardError as e:
        raise Exception(f"Payment failed: {e.user_message}")

@hy.task
def reserve_inventory(cart_items: list):
    """Reserve inventory for cart items"""
    reserved_items = []
    
    for item in cart_items:
        success = reserve_product_stock(
            item['product_id'], 
            item['quantity']
        )
        
        if not success:
            # Rollback previous reservations
            for reserved in reserved_items:
                release_product_stock(
                    reserved['product_id'], 
                    reserved['quantity']
                )
            raise Exception(f"Could not reserve stock for product {item['product_id']}")
        
        reserved_items.append(item)
    
    return reserved_items

@hy.task
def create_order(order_data: dict):
    """Create order record in database"""
    order = {
        'customer_id': order_data['customer_id'],
        'items': order_data['items'],
        'subtotal': order_data['subtotal'],
        'tax_amount': order_data['tax_amount'],
        'shipping_cost': order_data['shipping_cost'],
        'total_amount': order_data['total_amount'],
        'shipping_address': order_data['shipping_address'],
        'billing_address': order_data['billing_address'],
        'payment_intent_id': order_data['payment_intent_id'],
        'status': 'confirmed'
    }
    
    order_id = save_order_to_db(order)
    return {'order_id': order_id, 'order': order}

@hy.task
def send_order_confirmation(order_id: str, customer_email: str):
    """Send order confirmation email"""
    order = get_order_by_id(order_id)
    
    email_content = f"""
    Thank you for your order!
    
    Order #: {order_id}
    Total: ${order['total_amount']:.2f}
    
    Items:
    """
    
    for item in order['items']:
        product = get_product_by_id(item['product_id'])
        email_content += f"- {product['name']} x {item['quantity']}: ${item['total_price']:.2f}\\n"
    
    email_content += f"""
    
    Shipping Address:
    {format_address(order['shipping_address'])}
    
    We'll send you a tracking number when your order ships!
    """
    
    send_email(
        to=customer_email,
        subject=f"Order Confirmation #{order_id}",
        body=email_content
    )

@hy.task
def process_checkout(checkout_data: dict):
    """Orchestrate the complete checkout process"""
    try:
        # Step 1: Validate cart and calculate totals
        validation_result = validate_cart_items.send(checkout_data['cart_id']).get()
        
        # Step 2: Reserve inventory
        reserve_inventory.send(validation_result['items']).get()
        
        # Step 3: Process payment
        payment_data = {
            'amount': validation_result['subtotal'] + validation_result['tax_amount'] + validation_result['shipping_cost'],
            'payment_method_id': checkout_data['payment_method_id'],
            'return_url': checkout_data.get('return_url')
        }
        payment_result = process_payment.send(payment_data).get()
        
        # Step 4: Create order
        order_data = {
            **validation_result,
            'customer_id': checkout_data['customer_id'],
            'shipping_address': checkout_data['shipping_address'],
            'billing_address': checkout_data['billing_address'],
            'payment_intent_id': payment_result['payment_intent_id'],
            'total_amount': payment_data['amount']
        }
        order_result = create_order.send(order_data).get()
        
        # Step 5: Send confirmation email
        send_order_confirmation.send(order_result['order_id'], checkout_data['customer_email'])
        
        return {
            'success': True,
            'order_id': order_result['order_id'],
            'payment_status': payment_result['status']
        }
        
    except Exception as e:
        # Handle failures - rollback inventory, refund payment if necessary
        handle_checkout_failure.send(checkout_data, str(e))
        raise e

@hy.task  
def handle_checkout_failure(checkout_data: dict, error_message: str):
    """Handle checkout failures with proper cleanup"""
    # Log the error
    log_checkout_error(checkout_data['cart_id'], error_message)
    
    # Send failure notification to customer
    send_email(
        to=checkout_data['customer_email'],
        subject="Order Processing Issue",
        body=f"We encountered an issue processing your order. Error: {error_message}. Please try again or contact support."
    )

REST API Endpoints

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from .tasks import process_checkout, process_refund

app = FastAPI()

class CheckoutRequest(BaseModel):
    cart_id: str
    customer_id: str
    customer_email: str
    payment_method_id: str
    shipping_address: dict
    billing_address: dict
    return_url: str = None

class RefundRequest(BaseModel):
    order_id: str
    reason: str = None
    amount: float = None  # Partial refund amount

@app.post("/checkout/process")
async def initiate_checkout(request: CheckoutRequest):
    """Start the checkout process"""
    checkout_data = {
        'cart_id': request.cart_id,
        'customer_id': request.customer_id,
        'customer_email': request.customer_email,
        'payment_method_id': request.payment_method_id,
        'shipping_address': request.shipping_address,
        'billing_address': request.billing_address,
        'return_url': request.return_url
    }
    
    task = process_checkout.send(checkout_data)
    
    return {
        "message": "Checkout initiated",
        "task_id": task.task_id,
        "cart_id": request.cart_id
    }

@app.get("/checkout/status/{task_id}")
async def get_checkout_status(task_id: str):
    """Get checkout processing status"""
    task = hy.get_task(task_id)
    
    return {
        "task_id": task_id,
        "status": task.status,
        "result": task.result if task.is_complete else None,
        "error": task.error if task.has_failed else None
    }

@app.post("/orders/refund")
async def process_refund_endpoint(request: RefundRequest):
    """Process order refund"""
    task = process_refund.send(request.order_id, request.amount, request.reason)
    
    return {
        "message": "Refund processing started",
        "task_id": task.task_id,
        "order_id": request.order_id
    }

@app.get("/orders/{order_id}")
async def get_order(order_id: str):
    """Get order details"""
    order = get_order_by_id(order_id)
    if not order:
        raise HTTPException(status_code=404, detail="Order not found")
    
    return order

Advanced Checkout Features

Refund Processing

@hy.task
def process_refund(order_id: str, refund_amount: float = None, reason: str = None):
    """Process full or partial refund"""
    order = get_order_by_id(order_id)
    if not order:
        raise Exception("Order not found")
    
    # Determine refund amount
    if refund_amount is None:
        refund_amount = order['total_amount']
    
    # Process refund through Stripe
    stripe.api_key = os.environ.get('STRIPE_SECRET_KEY')
    
    try:
        refund = stripe.Refund.create(
            payment_intent=order['payment_intent_id'],
            amount=int(refund_amount * 100),  # Convert to cents
            reason=reason or 'requested_by_customer'
        )
        
        # Update order status
        update_order_status(order_id, 'refunded', {
            'refund_id': refund.id,
            'refund_amount': refund_amount,
            'refund_reason': reason
        })
        
        # Restore inventory if full refund
        if refund_amount == order['total_amount']:
            for item in order['items']:
                release_product_stock(item['product_id'], item['quantity'])
        
        # Send refund confirmation email
        send_refund_confirmation.send(order_id, order['customer_email'], refund_amount)
        
        return {
            'order_id': order_id,
            'refund_id': refund.id,
            'refund_amount': refund_amount,
            'status': refund.status
        }
        
    except stripe.error.StripeError as e:
        raise Exception(f"Refund failed: {str(e)}")

Subscription Checkout

@hy.task
def process_subscription_checkout(subscription_data: dict):
    """Handle recurring subscription checkout"""
    try:
        # Create customer if not exists
        customer = create_or_get_stripe_customer(
            subscription_data['customer_email'],
            subscription_data['payment_method_id']
        )
        
        # Create subscription
        subscription = stripe.Subscription.create(
            customer=customer.id,
            items=[{
                'price': subscription_data['price_id'],
                'quantity': subscription_data.get('quantity', 1)
            }],
            payment_behavior='default_incomplete',
            expand=['latest_invoice.payment_intent']
        )
        
        # Save subscription to database
        subscription_id = save_subscription_to_db({
            'stripe_subscription_id': subscription.id,
            'customer_id': subscription_data['customer_id'],
            'price_id': subscription_data['price_id'],
            'status': subscription.status
        })
        
        return {
            'subscription_id': subscription_id,
            'stripe_subscription_id': subscription.id,
            'client_secret': subscription.latest_invoice.payment_intent.client_secret,
            'status': subscription.status
        }
        
    except stripe.error.StripeError as e:
        raise Exception(f"Subscription creation failed: {str(e)}")

Usage Examples

# Process checkout
curl -X POST http://localhost:8000/checkout/process \
  -H "Content-Type: application/json" \
  -d '{
    "cart_id": "cart_123",
    "customer_id": "cust_456",
    "customer_email": "customer@example.com",
    "payment_method_id": "pm_card_visa",
    "shipping_address": {
      "name": "John Doe",
      "street": "123 Main St",
      "city": "San Francisco",
      "state": "CA",
      "zip": "94105"
    },
    "billing_address": {
      "name": "John Doe",
      "street": "123 Main St", 
      "city": "San Francisco",
      "state": "CA",
      "zip": "94105"
    }
  }'

# Check checkout status
curl http://localhost:8000/checkout/status/task_12345

# Process refund
curl -X POST http://localhost:8000/orders/refund \
  -H "Content-Type: application/json" \
  -d '{
    "order_id": "order_789",
    "reason": "Customer requested cancellation",
    "amount": 49.99
  }'

Error Handling Patterns

Idempotent Operations

@hy.task
def idempotent_checkout(checkout_data: dict):
    """Ensure checkout can be retried safely"""
    idempotency_key = f"checkout_{checkout_data['cart_id']}_{hash(str(checkout_data))}"
    
    # Check if already processed
    existing_order = get_order_by_idempotency_key(idempotency_key)
    if existing_order:
        return {
            'success': True,
            'order_id': existing_order['id'],
            'already_processed': True
        }
    
    # Process with idempotency key
    result = process_checkout_with_key(checkout_data, idempotency_key)
    return result

Circuit Breaker Pattern

from functools import wraps
import time

def circuit_breaker(failure_threshold=5, recovery_timeout=60):
    def decorator(func):
        func.failure_count = 0
        func.last_failure_time = None
        func.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            if func.state == 'OPEN':
                if time.time() - func.last_failure_time > recovery_timeout:
                    func.state = 'HALF_OPEN'
                else:
                    raise Exception("Circuit breaker is OPEN")
            
            try:
                result = func(*args, **kwargs)
                if func.state == 'HALF_OPEN':
                    func.state = 'CLOSED'
                    func.failure_count = 0
                return result
            except Exception as e:
                func.failure_count += 1
                func.last_failure_time = time.time()
                
                if func.failure_count >= failure_threshold:
                    func.state = 'OPEN'
                
                raise e
        
        return wrapper
    return decorator

@hy.task
@circuit_breaker(failure_threshold=3, recovery_timeout=30)
def stripe_payment_with_circuit_breaker(payment_data: dict):
    """Payment processing with circuit breaker protection"""
    return process_payment(payment_data)

Production Considerations

  • Payment security: Use Stripe’s secure payment handling and never store card details
  • Inventory consistency: Implement proper locking mechanisms for stock reservations
  • Idempotency: Ensure checkout operations can be retried safely
  • Monitoring: Track checkout conversion rates and failure patterns
  • PCI compliance: Follow PCI DSS requirements for payment data handling
  • Tax calculation: Integrate with tax services for accurate calculations

Next Steps