Create seamless user onboarding experiences by automating welcome emails, trial subscriptions, sample data creation, and progressive engagement workflows.

Overview

This example demonstrates comprehensive user onboarding automation:
  • Welcome email sequences with personalized messaging
  • Trial subscription setup with Stripe integration
  • Sample data creation to help users get started quickly
  • Progressive onboarding with scheduled follow-up tasks
  • Orchestrated workflows ensuring consistent user experience
  • Retry mechanisms for reliable email and payment processing
Perfect for SaaS applications, subscription services, or any platform requiring user activation.

Task Definitions

from hyrex import HyrexRegistry
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import jinja2
import stripe
import os
from datetime import datetime, timedelta

hy = HyrexRegistry()

@hy.task(max_retries=3)
def send_welcome_email(user_id: str, user_email: str, user_name: str):
    """Send personalized welcome email to new user"""
    template = jinja2.Template("""
    <html>
    <body>
        <h2>Welcome to our platform, {{ user_name }}! πŸŽ‰</h2>
        
        <p>We're excited to have you on board. Here's what you can do next:</p>
        
        <ul>
            <li><strong>Complete your profile setup</strong> - Add your details and preferences</li>
            <li><strong>Explore our dashboard</strong> - See what's possible with our tools</li>
            <li><strong>Connect with your team</strong> - Invite colleagues to collaborate</li>
            <li><strong>Try our sample projects</strong> - We've created some examples to get you started</li>
        </ul>
        
        <p>Your 14-day free trial has started - no credit card required!</p>
        
        <p><a href="https://app.company.com/dashboard?user_id={{ user_id }}" 
              style="background: #007cba; color: white; padding: 12px 24px; text-decoration: none; border-radius: 4px;">
              Get Started β†’
        </a></p>
        
        <p>Need help? Just reply to this email and we'll get back to you quickly.</p>
        
        <p>Best regards,<br>The Team</p>
    </body>
    </html>
    """)
    
    msg = MIMEMultipart('alternative')
    msg['From'] = os.environ.get('SMTP_FROM_EMAIL')
    msg['To'] = user_email
    msg['Subject'] = f"Welcome aboard, {user_name}!"
    
    html_body = template.render(user_name=user_name, user_id=user_id)
    msg.attach(MIMEText(html_body, 'html'))
    
    server = smtplib.SMTP('smtp.gmail.com', 587)
    server.starttls()
    server.login(os.environ.get('SMTP_USERNAME'), os.environ.get('SMTP_PASSWORD'))
    server.send_message(msg)
    server.quit()
    
    # Track email sent event
    track_user_event(user_id, 'welcome_email_sent', {
        'email': user_email,
        'timestamp': datetime.now().isoformat()
    })
    
    return {"email_sent": True, "recipient": user_email}

@hy.task(max_retries=3)
def setup_user_trial(user_id: str, user_email: str, plan_type: str = "basic"):
    """Create Stripe customer and trial subscription"""
    stripe.api_key = os.environ.get('STRIPE_SECRET_KEY')
    
    try:
        # Create customer
        customer = stripe.Customer.create(
            email=user_email,
            metadata={"user_id": user_id, "onboarding": "true"}
        )
        
        # Create trial subscription
        subscription = stripe.Subscription.create(
            customer=customer.id,
            items=[{"price": f"price_{plan_type}_monthly"}],
            trial_period_days=14,
            metadata={"user_id": user_id, "plan_type": plan_type}
        )
        
        # Save subscription info to database
        save_subscription_info(user_id, {
            'stripe_customer_id': customer.id,
            'stripe_subscription_id': subscription.id,
            'plan_type': plan_type,
            'trial_start': datetime.now().isoformat(),
            'trial_end': (datetime.now() + timedelta(days=14)).isoformat()
        })
        
        return {
            "customer_id": customer.id,
            "subscription_id": subscription.id,
            "trial_days": 14,
            "plan_type": plan_type
        }
        
    except stripe.error.StripeError as e:
        # Log error but don't fail onboarding
        log_error(f"Stripe setup failed for user {user_id}: {str(e)}")
        return {"error": "trial_setup_failed", "message": str(e)}

@hy.task
def create_sample_data(user_id: str):
    """Generate sample data and projects for new users"""
    sample_projects = [
        {
            "name": "My First Project",
            "description": "Get started with this example project",
            "template": "getting_started",
            "sample_data": True
        },
        {
            "name": "Demo Workflow",
            "description": "See how automation works with this demo",
            "template": "automation_demo",
            "sample_data": True
        }
    ]
    
    created_projects = []
    for project_data in sample_projects:
        project_id = create_user_project(user_id, project_data)
        populate_sample_content(project_id, project_data['template'])
        created_projects.append(project_id)
    
    # Create sample team
    create_sample_team(user_id, "My Team")
    
    return {
        "sample_projects": created_projects,
        "sample_team_created": True
    }

@hy.task
def schedule_followup_emails(user_id: str, user_email: str, user_name: str):
    """Schedule progressive onboarding email sequence"""
    follow_up_emails = [
        {"delay_hours": 24, "template": "day_1_tips", "subject": "Quick tips to get the most out of your trial"},
        {"delay_hours": 72, "template": "day_3_features", "subject": "Unlock powerful features you might have missed"},
        {"delay_hours": 168, "template": "week_1_checkin", "subject": "How's your first week going?"},
        {"delay_hours": 312, "template": "trial_ending", "subject": "Your trial ends soon - let's keep you going!"}
    ]
    
    scheduled_tasks = []
    for email_config in follow_up_emails:
        # Schedule email to be sent after delay
        scheduled_time = datetime.now() + timedelta(hours=email_config['delay_hours'])
        
        task = send_followup_email.send_at(
            scheduled_time,
            user_id,
            user_email, 
            user_name,
            email_config['template'],
            email_config['subject']
        )
        
        scheduled_tasks.append({
            "task_id": task.task_id,
            "template": email_config['template'],
            "scheduled_for": scheduled_time.isoformat()
        })
    
    return {"scheduled_emails": len(scheduled_tasks), "tasks": scheduled_tasks}

@hy.task
def send_followup_email(user_id: str, user_email: str, user_name: str, template_name: str, subject: str):
    """Send scheduled follow-up email"""
    # Check if user is still active and hasn't unsubscribed
    user_status = get_user_status(user_id)
    if user_status['unsubscribed'] or user_status['churned']:
        return {"skipped": True, "reason": f"User status: {user_status}"}
    
    # Get personalized content based on user activity
    user_activity = get_user_activity_summary(user_id)
    template_data = {
        "user_name": user_name,
        "user_id": user_id,
        "activity_summary": user_activity,
        "days_since_signup": (datetime.now() - user_status['signup_date']).days
    }
    
    send_template_email(user_email, subject, template_name, template_data)
    
    return {"email_sent": True, "template": template_name}

@hy.task
def track_onboarding_completion(user_id: str):
    """Track user onboarding progress and completion"""
    completion_criteria = [
        'profile_completed',
        'first_project_created', 
        'team_member_invited',
        'first_workflow_run'
    ]
    
    user_progress = check_user_progress(user_id, completion_criteria)
    completion_percentage = (user_progress['completed_count'] / len(completion_criteria)) * 100
    
    if completion_percentage >= 75:
        # User is well onboarded, send success email
        send_onboarding_success_email.send(user_id)
    
    update_user_onboarding_status(user_id, {
        'completion_percentage': completion_percentage,
        'completed_steps': user_progress['completed_steps'],
        'last_updated': datetime.now().isoformat()
    })
    
    return user_progress

@hy.task
def onboard_new_user(user_id: str, user_email: str, user_name: str, plan_type: str = "basic"):
    """Orchestrate the complete user onboarding process"""
    onboarding_start = datetime.now()
    
    try:
        # Step 1: Send welcome email
        welcome_result = send_welcome_email.send(user_id, user_email, user_name).get()
        
        # Step 2: Setup trial subscription
        trial_result = setup_user_trial.send(user_id, user_email, plan_type).get()
        
        # Step 3: Create sample data and projects
        sample_data_result = create_sample_data.send(user_id).get()
        
        # Step 4: Schedule follow-up email sequence
        followup_result = schedule_followup_emails.send(user_id, user_email, user_name).get()
        
        # Step 5: Initialize progress tracking
        progress_result = track_onboarding_completion.send(user_id).get()
        
        onboarding_duration = (datetime.now() - onboarding_start).total_seconds()
        
        # Record successful onboarding
        record_onboarding_completion(user_id, {
            'status': 'success',
            'duration_seconds': onboarding_duration,
            'welcome_email': welcome_result.get('email_sent', False),
            'trial_setup': trial_result.get('customer_id') is not None,
            'sample_data': len(sample_data_result.get('sample_projects', [])),
            'scheduled_emails': followup_result.get('scheduled_emails', 0),
            'completion_percentage': progress_result.get('completion_percentage', 0)
        })
        
        return {
            "status": "onboarding_completed",
            "user_id": user_id,
            "duration_seconds": onboarding_duration,
            "trial_info": trial_result,
            "sample_projects": sample_data_result['sample_projects'],
            "scheduled_followups": followup_result['scheduled_emails']
        }
        
    except Exception as e:
        # Record failed onboarding
        record_onboarding_completion(user_id, {
            'status': 'failed',
            'error': str(e),
            'duration_seconds': (datetime.now() - onboarding_start).total_seconds()
        })
        
        # Send alert to team
        send_onboarding_failure_alert(user_id, user_email, str(e))
        
        raise e

# Scheduled task to check trial expiration
@hy.cron("0 9 * * *")  # Daily at 9 AM
def process_trial_expirations():
    """Check for expiring trials and send notifications"""
    expiring_trials = get_trials_expiring_soon(days=3)
    
    for trial in expiring_trials:
        send_trial_expiration_notice.send(
            trial['user_id'],
            trial['user_email'],
            trial['user_name'],
            trial['days_remaining']
        )
    
    return {"processed_trials": len(expiring_trials)}

REST API Endpoints

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
from .tasks import onboard_new_user, send_welcome_email, setup_user_trial

app = FastAPI()

class UserOnboardingRequest(BaseModel):
    user_id: str
    user_email: str
    user_name: str
    plan_type: str = "basic"
    
class WelcomeEmailRequest(BaseModel):
    user_id: str
    user_email: str
    user_name: str

class TrialSetupRequest(BaseModel):
    user_id: str
    user_email: str
    plan_type: str = "basic"

@app.post("/onboarding/start")
async def start_user_onboarding(request: UserOnboardingRequest):
    """Trigger complete user onboarding workflow"""
    try:
        task = onboard_new_user.send(
            request.user_id, 
            request.user_email, 
            request.user_name,
            request.plan_type
        )
        
        return {
            "message": "Onboarding started successfully",
            "task_id": task.task_id,
            "user_id": request.user_id,
            "plan_type": request.plan_type
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/onboarding/welcome-email") 
async def send_welcome_email_only(request: WelcomeEmailRequest):
    """Send just the welcome email"""
    try:
        task = send_welcome_email.send(
            request.user_id,
            request.user_email, 
            request.user_name
        )
        
        return {
            "message": "Welcome email queued successfully",
            "task_id": task.task_id,
            "recipient": request.user_email
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/onboarding/setup-trial")
async def setup_trial_subscription(request: TrialSetupRequest):
    """Setup trial subscription only"""
    try:
        task = setup_user_trial.send(
            request.user_id, 
            request.user_email,
            request.plan_type
        )
        
        return {
            "message": "Trial setup started",
            "task_id": task.task_id,
            "user_id": request.user_id,
            "plan_type": request.plan_type
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/onboarding/status/{task_id}")
async def get_onboarding_status(task_id: str):
    """Get onboarding task status"""
    try:
        task = hy.get_task(task_id)
        
        return {
            "task_id": task_id,
            "status": task.status,
            "result": task.result if task.is_complete else None,
            "error": task.error if task.has_failed else None,
            "created_at": task.created_at,
            "completed_at": task.completed_at
        }
    except Exception as e:
        raise HTTPException(status_code=404, detail="Task not found")

@app.get("/onboarding/progress/{user_id}")
async def get_user_onboarding_progress(user_id: str):
    """Get user's onboarding progress"""
    try:
        progress = get_user_onboarding_status(user_id)
        return {
            "user_id": user_id,
            "completion_percentage": progress.get('completion_percentage', 0),
            "completed_steps": progress.get('completed_steps', []),
            "last_updated": progress.get('last_updated'),
            "trial_info": progress.get('trial_info', {})
        }
    except Exception as e:
        raise HTTPException(status_code=404, detail="User not found")

Usage Examples

Complete Onboarding Flow

# Start full onboarding for new user
curl -X POST http://localhost:8000/onboarding/start \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "user_12345",
    "user_email": "john.doe@example.com",
    "user_name": "John Doe",
    "plan_type": "pro"
  }'

# Check onboarding status
curl http://localhost:8000/onboarding/status/task_67890

# Get user's progress
curl http://localhost:8000/onboarding/progress/user_12345

Individual Components

# Send welcome email only
curl -X POST http://localhost:8000/onboarding/welcome-email \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "user_12345",
    "user_email": "john.doe@example.com",
    "user_name": "John Doe"
  }'

# Setup trial subscription only
curl -X POST http://localhost:8000/onboarding/setup-trial \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "user_12345",
    "user_email": "john.doe@example.com",
    "plan_type": "basic"
  }'

Advanced Onboarding Patterns

Personalized Onboarding

@hy.task
def personalized_onboarding(user_id: str, user_profile: dict):
    """Customize onboarding based on user profile"""
    if user_profile.get('role') == 'developer':
        create_developer_samples.send(user_id)
        send_technical_welcome.send(user_id, user_profile['email'])
    elif user_profile.get('company_size') == 'enterprise':
        setup_enterprise_features.send(user_id)
        schedule_demo_call.send(user_id, user_profile)
    else:
        # Standard onboarding
        create_sample_data.send(user_id)
    
    return {"onboarding_type": get_onboarding_type(user_profile)}

A/B Testing Integration

@hy.task
def ab_test_onboarding(user_id: str, user_email: str, user_name: str):
    """Run A/B test variants for onboarding"""
    variant = get_ab_test_variant(user_id, 'onboarding_flow_v2')
    
    if variant == 'control':
        # Original onboarding flow
        result = onboard_new_user.send(user_id, user_email, user_name).get()
    else:
        # New experimental flow
        result = experimental_onboarding_flow.send(user_id, user_email, user_name).get()
    
    # Track variant performance
    track_ab_test_outcome(user_id, 'onboarding_flow_v2', variant, {
        'completed': result['status'] == 'onboarding_completed',
        'duration': result.get('duration_seconds', 0)
    })
    
    return result

Cohort-Based Onboarding

@hy.task
def cohort_onboarding_analysis():
    """Analyze onboarding success by user cohort"""
    cohorts = get_recent_user_cohorts(days=30)
    
    analysis_results = []
    for cohort in cohorts:
        cohort_stats = {
            'cohort_date': cohort['date'],
            'user_count': len(cohort['users']),
            'completion_rate': calculate_completion_rate(cohort['users']),
            'avg_time_to_complete': calculate_avg_completion_time(cohort['users']),
            'trial_conversion_rate': calculate_trial_conversion(cohort['users'])
        }
        analysis_results.append(cohort_stats)
    
    # Send weekly report to product team
    send_cohort_analysis_report.send(analysis_results)
    
    return analysis_results

Email Templates

Welcome Email Template

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <style>
        .container { max-width: 600px; margin: 0 auto; font-family: Arial, sans-serif; }
        .header { background: #f8f9fa; padding: 20px; text-align: center; }
        .content { padding: 20px; }
        .cta-button { 
            display: inline-block; 
            background: #007cba; 
            color: white; 
            padding: 12px 24px; 
            text-decoration: none; 
            border-radius: 4px; 
            margin: 20px 0;
        }
        .checklist { background: #f8f9fa; padding: 15px; margin: 20px 0; }
        .checklist li { margin: 8px 0; }
    </style>
</head>
<body>
    <div class="container">
        <div class="header">
            <h1>Welcome to [Company Name]! πŸŽ‰</h1>
        </div>
        <div class="content">
            <p>Hi {{user_name}},</p>
            
            <p>Welcome aboard! We're thrilled to have you join our community of [user_count] users who are already transforming their workflows.</p>
            
            <div class="checklist">
                <h3>Your Quick Start Checklist:</h3>
                <ul>
                    <li>βœ… Account created (you're here!)</li>
                    <li>⭐ Complete your profile</li>
                    <li>πŸš€ Try your first project</li>
                    <li>πŸ‘₯ Invite your team</li>
                </ul>
            </div>
            
            <p>We've set up some sample projects to help you get started. Your 14-day free trial gives you full access to all features - no credit card required.</p>
            
            <a href="{{dashboard_url}}" class="cta-button">Get Started β†’</a>
            
            <p>Questions? Just reply to this email and our team will help you out.</p>
            
            <p>Best,<br>The [Company] Team</p>
        </div>
    </div>
</body>
</html>

Monitoring & Analytics

@hy.task
def generate_onboarding_metrics():
    """Generate onboarding performance metrics"""
    metrics = {
        'total_signups_last_30_days': count_signups(days=30),
        'onboarding_completion_rate': calculate_completion_rate(days=30),
        'average_onboarding_time': calculate_avg_onboarding_time(days=30),
        'email_open_rates': get_email_open_rates('welcome_sequence'),
        'trial_to_paid_conversion': calculate_trial_conversion(days=30),
        'drop_off_points': identify_drop_off_points(days=30)
    }
    
    # Send to analytics dashboard
    send_to_analytics_dashboard(metrics)
    
    # Alert if metrics are below threshold
    if metrics['onboarding_completion_rate'] < 0.75:
        send_alert_to_team("Onboarding completion rate below 75%", metrics)
    
    return metrics

Production Considerations

  • Email deliverability: Use reputable email services and monitor deliverability
  • Progressive enhancement: Don’t let single failures block the entire onboarding
  • A/B testing: Continuously optimize email templates and onboarding flows
  • Personalization: Tailor onboarding based on user profile and behavior
  • Performance monitoring: Track completion rates, time-to-value, and drop-off points
  • Compliance: Ensure email subscriptions comply with GDPR and CAN-SPAM

Next Steps