Overview
This example demonstrates comprehensive user onboarding automation:- Welcome email sequences with personalized messaging
- Trial subscription setup with Stripe integration
- Sample data creation to help users get started quickly
- Progressive onboarding with scheduled follow-up tasks
- Orchestrated workflows ensuring consistent user experience
- Retry mechanisms for reliable email and payment processing
Task Definitions
Copy
Ask AI
from hyrex import HyrexRegistry
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import jinja2
import stripe
import os
from datetime import datetime, timedelta
hy = HyrexRegistry()
@hy.task(max_retries=3)
def send_welcome_email(user_id: str, user_email: str, user_name: str):
"""Send personalized welcome email to new user"""
template = jinja2.Template("""
<html>
<body>
<h2>Welcome to our platform, {{ user_name }}! π</h2>
<p>We're excited to have you on board. Here's what you can do next:</p>
<ul>
<li><strong>Complete your profile setup</strong> - Add your details and preferences</li>
<li><strong>Explore our dashboard</strong> - See what's possible with our tools</li>
<li><strong>Connect with your team</strong> - Invite colleagues to collaborate</li>
<li><strong>Try our sample projects</strong> - We've created some examples to get you started</li>
</ul>
<p>Your 14-day free trial has started - no credit card required!</p>
<p><a href="https://app.company.com/dashboard?user_id={{ user_id }}"
style="background: #007cba; color: white; padding: 12px 24px; text-decoration: none; border-radius: 4px;">
Get Started β
</a></p>
<p>Need help? Just reply to this email and we'll get back to you quickly.</p>
<p>Best regards,<br>The Team</p>
</body>
</html>
""")
msg = MIMEMultipart('alternative')
msg['From'] = os.environ.get('SMTP_FROM_EMAIL')
msg['To'] = user_email
msg['Subject'] = f"Welcome aboard, {user_name}!"
html_body = template.render(user_name=user_name, user_id=user_id)
msg.attach(MIMEText(html_body, 'html'))
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(os.environ.get('SMTP_USERNAME'), os.environ.get('SMTP_PASSWORD'))
server.send_message(msg)
server.quit()
# Track email sent event
track_user_event(user_id, 'welcome_email_sent', {
'email': user_email,
'timestamp': datetime.now().isoformat()
})
return {"email_sent": True, "recipient": user_email}
@hy.task(max_retries=3)
def setup_user_trial(user_id: str, user_email: str, plan_type: str = "basic"):
"""Create Stripe customer and trial subscription"""
stripe.api_key = os.environ.get('STRIPE_SECRET_KEY')
try:
# Create customer
customer = stripe.Customer.create(
email=user_email,
metadata={"user_id": user_id, "onboarding": "true"}
)
# Create trial subscription
subscription = stripe.Subscription.create(
customer=customer.id,
items=[{"price": f"price_{plan_type}_monthly"}],
trial_period_days=14,
metadata={"user_id": user_id, "plan_type": plan_type}
)
# Save subscription info to database
save_subscription_info(user_id, {
'stripe_customer_id': customer.id,
'stripe_subscription_id': subscription.id,
'plan_type': plan_type,
'trial_start': datetime.now().isoformat(),
'trial_end': (datetime.now() + timedelta(days=14)).isoformat()
})
return {
"customer_id": customer.id,
"subscription_id": subscription.id,
"trial_days": 14,
"plan_type": plan_type
}
except stripe.error.StripeError as e:
# Log error but don't fail onboarding
log_error(f"Stripe setup failed for user {user_id}: {str(e)}")
return {"error": "trial_setup_failed", "message": str(e)}
@hy.task
def create_sample_data(user_id: str):
"""Generate sample data and projects for new users"""
sample_projects = [
{
"name": "My First Project",
"description": "Get started with this example project",
"template": "getting_started",
"sample_data": True
},
{
"name": "Demo Workflow",
"description": "See how automation works with this demo",
"template": "automation_demo",
"sample_data": True
}
]
created_projects = []
for project_data in sample_projects:
project_id = create_user_project(user_id, project_data)
populate_sample_content(project_id, project_data['template'])
created_projects.append(project_id)
# Create sample team
create_sample_team(user_id, "My Team")
return {
"sample_projects": created_projects,
"sample_team_created": True
}
@hy.task
def schedule_followup_emails(user_id: str, user_email: str, user_name: str):
"""Schedule progressive onboarding email sequence"""
follow_up_emails = [
{"delay_hours": 24, "template": "day_1_tips", "subject": "Quick tips to get the most out of your trial"},
{"delay_hours": 72, "template": "day_3_features", "subject": "Unlock powerful features you might have missed"},
{"delay_hours": 168, "template": "week_1_checkin", "subject": "How's your first week going?"},
{"delay_hours": 312, "template": "trial_ending", "subject": "Your trial ends soon - let's keep you going!"}
]
scheduled_tasks = []
for email_config in follow_up_emails:
# Schedule email to be sent after delay
scheduled_time = datetime.now() + timedelta(hours=email_config['delay_hours'])
task = send_followup_email.send_at(
scheduled_time,
user_id,
user_email,
user_name,
email_config['template'],
email_config['subject']
)
scheduled_tasks.append({
"task_id": task.task_id,
"template": email_config['template'],
"scheduled_for": scheduled_time.isoformat()
})
return {"scheduled_emails": len(scheduled_tasks), "tasks": scheduled_tasks}
@hy.task
def send_followup_email(user_id: str, user_email: str, user_name: str, template_name: str, subject: str):
"""Send scheduled follow-up email"""
# Check if user is still active and hasn't unsubscribed
user_status = get_user_status(user_id)
if user_status['unsubscribed'] or user_status['churned']:
return {"skipped": True, "reason": f"User status: {user_status}"}
# Get personalized content based on user activity
user_activity = get_user_activity_summary(user_id)
template_data = {
"user_name": user_name,
"user_id": user_id,
"activity_summary": user_activity,
"days_since_signup": (datetime.now() - user_status['signup_date']).days
}
send_template_email(user_email, subject, template_name, template_data)
return {"email_sent": True, "template": template_name}
@hy.task
def track_onboarding_completion(user_id: str):
"""Track user onboarding progress and completion"""
completion_criteria = [
'profile_completed',
'first_project_created',
'team_member_invited',
'first_workflow_run'
]
user_progress = check_user_progress(user_id, completion_criteria)
completion_percentage = (user_progress['completed_count'] / len(completion_criteria)) * 100
if completion_percentage >= 75:
# User is well onboarded, send success email
send_onboarding_success_email.send(user_id)
update_user_onboarding_status(user_id, {
'completion_percentage': completion_percentage,
'completed_steps': user_progress['completed_steps'],
'last_updated': datetime.now().isoformat()
})
return user_progress
@hy.task
def onboard_new_user(user_id: str, user_email: str, user_name: str, plan_type: str = "basic"):
"""Orchestrate the complete user onboarding process"""
onboarding_start = datetime.now()
try:
# Step 1: Send welcome email
welcome_result = send_welcome_email.send(user_id, user_email, user_name).get()
# Step 2: Setup trial subscription
trial_result = setup_user_trial.send(user_id, user_email, plan_type).get()
# Step 3: Create sample data and projects
sample_data_result = create_sample_data.send(user_id).get()
# Step 4: Schedule follow-up email sequence
followup_result = schedule_followup_emails.send(user_id, user_email, user_name).get()
# Step 5: Initialize progress tracking
progress_result = track_onboarding_completion.send(user_id).get()
onboarding_duration = (datetime.now() - onboarding_start).total_seconds()
# Record successful onboarding
record_onboarding_completion(user_id, {
'status': 'success',
'duration_seconds': onboarding_duration,
'welcome_email': welcome_result.get('email_sent', False),
'trial_setup': trial_result.get('customer_id') is not None,
'sample_data': len(sample_data_result.get('sample_projects', [])),
'scheduled_emails': followup_result.get('scheduled_emails', 0),
'completion_percentage': progress_result.get('completion_percentage', 0)
})
return {
"status": "onboarding_completed",
"user_id": user_id,
"duration_seconds": onboarding_duration,
"trial_info": trial_result,
"sample_projects": sample_data_result['sample_projects'],
"scheduled_followups": followup_result['scheduled_emails']
}
except Exception as e:
# Record failed onboarding
record_onboarding_completion(user_id, {
'status': 'failed',
'error': str(e),
'duration_seconds': (datetime.now() - onboarding_start).total_seconds()
})
# Send alert to team
send_onboarding_failure_alert(user_id, user_email, str(e))
raise e
# Scheduled task to check trial expiration
@hy.cron("0 9 * * *") # Daily at 9 AM
def process_trial_expirations():
"""Check for expiring trials and send notifications"""
expiring_trials = get_trials_expiring_soon(days=3)
for trial in expiring_trials:
send_trial_expiration_notice.send(
trial['user_id'],
trial['user_email'],
trial['user_name'],
trial['days_remaining']
)
return {"processed_trials": len(expiring_trials)}
REST API Endpoints
Copy
Ask AI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
from .tasks import onboard_new_user, send_welcome_email, setup_user_trial
app = FastAPI()
class UserOnboardingRequest(BaseModel):
user_id: str
user_email: str
user_name: str
plan_type: str = "basic"
class WelcomeEmailRequest(BaseModel):
user_id: str
user_email: str
user_name: str
class TrialSetupRequest(BaseModel):
user_id: str
user_email: str
plan_type: str = "basic"
@app.post("/onboarding/start")
async def start_user_onboarding(request: UserOnboardingRequest):
"""Trigger complete user onboarding workflow"""
try:
task = onboard_new_user.send(
request.user_id,
request.user_email,
request.user_name,
request.plan_type
)
return {
"message": "Onboarding started successfully",
"task_id": task.task_id,
"user_id": request.user_id,
"plan_type": request.plan_type
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/onboarding/welcome-email")
async def send_welcome_email_only(request: WelcomeEmailRequest):
"""Send just the welcome email"""
try:
task = send_welcome_email.send(
request.user_id,
request.user_email,
request.user_name
)
return {
"message": "Welcome email queued successfully",
"task_id": task.task_id,
"recipient": request.user_email
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/onboarding/setup-trial")
async def setup_trial_subscription(request: TrialSetupRequest):
"""Setup trial subscription only"""
try:
task = setup_user_trial.send(
request.user_id,
request.user_email,
request.plan_type
)
return {
"message": "Trial setup started",
"task_id": task.task_id,
"user_id": request.user_id,
"plan_type": request.plan_type
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/onboarding/status/{task_id}")
async def get_onboarding_status(task_id: str):
"""Get onboarding task status"""
try:
task = hy.get_task(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.is_complete else None,
"error": task.error if task.has_failed else None,
"created_at": task.created_at,
"completed_at": task.completed_at
}
except Exception as e:
raise HTTPException(status_code=404, detail="Task not found")
@app.get("/onboarding/progress/{user_id}")
async def get_user_onboarding_progress(user_id: str):
"""Get user's onboarding progress"""
try:
progress = get_user_onboarding_status(user_id)
return {
"user_id": user_id,
"completion_percentage": progress.get('completion_percentage', 0),
"completed_steps": progress.get('completed_steps', []),
"last_updated": progress.get('last_updated'),
"trial_info": progress.get('trial_info', {})
}
except Exception as e:
raise HTTPException(status_code=404, detail="User not found")
Usage Examples
Complete Onboarding Flow
Copy
Ask AI
# Start full onboarding for new user
curl -X POST http://localhost:8000/onboarding/start \
-H "Content-Type: application/json" \
-d '{
"user_id": "user_12345",
"user_email": "john.doe@example.com",
"user_name": "John Doe",
"plan_type": "pro"
}'
# Check onboarding status
curl http://localhost:8000/onboarding/status/task_67890
# Get user's progress
curl http://localhost:8000/onboarding/progress/user_12345
Individual Components
Copy
Ask AI
# Send welcome email only
curl -X POST http://localhost:8000/onboarding/welcome-email \
-H "Content-Type: application/json" \
-d '{
"user_id": "user_12345",
"user_email": "john.doe@example.com",
"user_name": "John Doe"
}'
# Setup trial subscription only
curl -X POST http://localhost:8000/onboarding/setup-trial \
-H "Content-Type: application/json" \
-d '{
"user_id": "user_12345",
"user_email": "john.doe@example.com",
"plan_type": "basic"
}'
Advanced Onboarding Patterns
Personalized Onboarding
Copy
Ask AI
@hy.task
def personalized_onboarding(user_id: str, user_profile: dict):
"""Customize onboarding based on user profile"""
if user_profile.get('role') == 'developer':
create_developer_samples.send(user_id)
send_technical_welcome.send(user_id, user_profile['email'])
elif user_profile.get('company_size') == 'enterprise':
setup_enterprise_features.send(user_id)
schedule_demo_call.send(user_id, user_profile)
else:
# Standard onboarding
create_sample_data.send(user_id)
return {"onboarding_type": get_onboarding_type(user_profile)}
A/B Testing Integration
Copy
Ask AI
@hy.task
def ab_test_onboarding(user_id: str, user_email: str, user_name: str):
"""Run A/B test variants for onboarding"""
variant = get_ab_test_variant(user_id, 'onboarding_flow_v2')
if variant == 'control':
# Original onboarding flow
result = onboard_new_user.send(user_id, user_email, user_name).get()
else:
# New experimental flow
result = experimental_onboarding_flow.send(user_id, user_email, user_name).get()
# Track variant performance
track_ab_test_outcome(user_id, 'onboarding_flow_v2', variant, {
'completed': result['status'] == 'onboarding_completed',
'duration': result.get('duration_seconds', 0)
})
return result
Cohort-Based Onboarding
Copy
Ask AI
@hy.task
def cohort_onboarding_analysis():
"""Analyze onboarding success by user cohort"""
cohorts = get_recent_user_cohorts(days=30)
analysis_results = []
for cohort in cohorts:
cohort_stats = {
'cohort_date': cohort['date'],
'user_count': len(cohort['users']),
'completion_rate': calculate_completion_rate(cohort['users']),
'avg_time_to_complete': calculate_avg_completion_time(cohort['users']),
'trial_conversion_rate': calculate_trial_conversion(cohort['users'])
}
analysis_results.append(cohort_stats)
# Send weekly report to product team
send_cohort_analysis_report.send(analysis_results)
return analysis_results
Email Templates
Welcome Email Template
Copy
Ask AI
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
.container { max-width: 600px; margin: 0 auto; font-family: Arial, sans-serif; }
.header { background: #f8f9fa; padding: 20px; text-align: center; }
.content { padding: 20px; }
.cta-button {
display: inline-block;
background: #007cba;
color: white;
padding: 12px 24px;
text-decoration: none;
border-radius: 4px;
margin: 20px 0;
}
.checklist { background: #f8f9fa; padding: 15px; margin: 20px 0; }
.checklist li { margin: 8px 0; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>Welcome to [Company Name]! π</h1>
</div>
<div class="content">
<p>Hi {{user_name}},</p>
<p>Welcome aboard! We're thrilled to have you join our community of [user_count] users who are already transforming their workflows.</p>
<div class="checklist">
<h3>Your Quick Start Checklist:</h3>
<ul>
<li>β
Account created (you're here!)</li>
<li>β Complete your profile</li>
<li>π Try your first project</li>
<li>π₯ Invite your team</li>
</ul>
</div>
<p>We've set up some sample projects to help you get started. Your 14-day free trial gives you full access to all features - no credit card required.</p>
<a href="{{dashboard_url}}" class="cta-button">Get Started β</a>
<p>Questions? Just reply to this email and our team will help you out.</p>
<p>Best,<br>The [Company] Team</p>
</div>
</div>
</body>
</html>
Monitoring & Analytics
Copy
Ask AI
@hy.task
def generate_onboarding_metrics():
"""Generate onboarding performance metrics"""
metrics = {
'total_signups_last_30_days': count_signups(days=30),
'onboarding_completion_rate': calculate_completion_rate(days=30),
'average_onboarding_time': calculate_avg_onboarding_time(days=30),
'email_open_rates': get_email_open_rates('welcome_sequence'),
'trial_to_paid_conversion': calculate_trial_conversion(days=30),
'drop_off_points': identify_drop_off_points(days=30)
}
# Send to analytics dashboard
send_to_analytics_dashboard(metrics)
# Alert if metrics are below threshold
if metrics['onboarding_completion_rate'] < 0.75:
send_alert_to_team("Onboarding completion rate below 75%", metrics)
return metrics
Production Considerations
- Email deliverability: Use reputable email services and monitor deliverability
- Progressive enhancement: Donβt let single failures block the entire onboarding
- A/B testing: Continuously optimize email templates and onboarding flows
- Personalization: Tailor onboarding based on user profile and behavior
- Performance monitoring: Track completion rates, time-to-value, and drop-off points
- Compliance: Ensure email subscriptions comply with GDPR and CAN-SPAM