Build reliable background tasks for common maintenance operations like database backups, file cleanup, analytics reporting, and media processing.

Overview

This example demonstrates how to automate essential maintenance tasks:
  • Database backups with S3 storage
  • File cleanup for temp files and logs
  • Analytics reporting with email notifications
  • Video thumbnail generation for media assets
  • Scheduled cron jobs for recurring maintenance
Perfect for keeping your production systems clean, backed up, and monitored.

Task Definitions

from hyrex import HyrexRegistry
from datetime import datetime, timedelta
import requests
import os

hy = HyrexRegistry()

@hy.task
def send_daily_report_email(recipient_email: str, report_data: dict):
    """Send daily analytics report via email"""
    date_str = datetime.now().strftime('%Y-%m-%d')
    email_content = f"Daily Analytics Report - {date_str}\\n\\n"
    email_content += f"Active Users: {report_data['active_users']}\\n"
    email_content += f"Revenue: ${report_data['revenue']:,.2f}\\n"
    email_content += f"New Signups: {report_data['new_signups']}\\n"
    email_content += f"Support Tickets: {report_data['support_tickets']}\\n\\n"
    email_content += "View full dashboard: https://yourapp.com/analytics"
    
    send_email(
        to=recipient_email,
        subject=f"Daily Report - {datetime.now().strftime('%m/%d/%Y')}",
        body=email_content
    )

@hy.task
def backup_database(database_name: str, s3_bucket: str):
    """Create database backup and upload to S3"""
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    backup_filename = f"{database_name}_{timestamp}.sql"
    
    # Create database dump
    os.system(f"pg_dump {database_name} > /tmp/{backup_filename}")
    
    # Upload to S3
    upload_to_s3(f"/tmp/{backup_filename}", s3_bucket, f"backups/{backup_filename}")
    
    # Clean up local file
    os.remove(f"/tmp/{backup_filename}")
    
    return {"backup_file": backup_filename, "size_mb": get_file_size_mb(backup_filename)}

@hy.task
def cleanup_old_files(directory: str, days_old: int = 30):
    """Remove files older than specified days"""
    cutoff_date = datetime.now() - timedelta(days=days_old)
    removed_files = []
    
    for filename in os.listdir(directory):
        file_path = os.path.join(directory, filename)
        if os.path.isfile(file_path):
            file_modified = datetime.fromtimestamp(os.path.getmtime(file_path))
            if file_modified < cutoff_date:
                os.remove(file_path)
                removed_files.append(filename)
    
    return {"removed_files": removed_files, "count": len(removed_files)}

@hy.task
def process_video_thumbnail(video_url: str, output_bucket: str):
    """Generate video thumbnail and upload to storage"""
    import cv2
    import tempfile
    
    # Download video to temp file
    video_response = requests.get(video_url)
    with tempfile.NamedTemporaryFile(suffix='.mp4') as temp_video:
        temp_video.write(video_response.content)
        temp_video.flush()
        
        # Extract frame at 2 seconds
        cap = cv2.VideoCapture(temp_video.name)
        cap.set(cv2.CAP_PROP_POS_MSEC, 2000)  # 2 seconds
        ret, frame = cap.read()
        
        if ret:
            thumbnail_filename = f"thumb_{hash(video_url)}.jpg"
            with tempfile.NamedTemporaryFile(suffix='.jpg') as temp_thumb:
                cv2.imwrite(temp_thumb.name, frame)
                upload_to_s3(temp_thumb.name, output_bucket, f"thumbnails/{thumbnail_filename}")
                
        cap.release()
        
    return {"thumbnail_url": f"s3://{output_bucket}/thumbnails/{thumbnail_filename}"}

@hy.task
def generate_analytics_report():
    """Generate daily analytics data"""
    report_data = {
        "active_users": get_active_users_count(),
        "revenue": get_daily_revenue(),
        "new_signups": get_new_signups_count(),
        "support_tickets": get_open_tickets_count()
    }
    
    # Store in database
    store_daily_metrics(report_data)
    
    # Send to stakeholders
    stakeholder_emails = ["ceo@company.com", "cto@company.com", "head-of-product@company.com"]
    for email in stakeholder_emails:
        send_daily_report_email.send(email, report_data)
    
    return report_data

# Schedule recurring tasks
@hy.cron("0 9 * * *")  # Every day at 9 AM
def daily_tasks():
    """Run daily maintenance tasks"""
    generate_analytics_report.send()
    backup_database.send("production_db", "company-backups")
    cleanup_old_files.send("/app/temp", 7)  # Clean files older than 7 days

@hy.cron("0 2 * * 0")  # Every Sunday at 2 AM
def weekly_tasks():
    """Run weekly maintenance tasks"""
    cleanup_old_files.send("/app/logs", 30)  # Clean logs older than 30 days
    backup_database.send("analytics_db", "company-backups")

REST API Endpoints

Provide on-demand task triggering through REST endpoints:
from fastapi import FastAPI
from pydantic import BaseModel
from .tasks import backup_database, cleanup_old_files, process_video_thumbnail

app = FastAPI()

class BackupRequest(BaseModel):
    database_name: str
    s3_bucket: str
    
class CleanupRequest(BaseModel):
    directory: str
    days_old: int = 30
    
class VideoProcessRequest(BaseModel):
    video_url: str
    output_bucket: str

@app.post("/tasks/backup")
async def trigger_backup(request: BackupRequest):
    """Trigger database backup"""
    task = backup_database.send(request.database_name, request.s3_bucket)
    
    return {
        "message": "Backup task started",
        "task_id": task.task_id,
        "database": request.database_name
    }

@app.post("/tasks/cleanup")
async def trigger_cleanup(request: CleanupRequest):
    """Trigger file cleanup"""
    task = cleanup_old_files.send(request.directory, request.days_old)
    
    return {
        "message": "Cleanup task started",
        "task_id": task.task_id,
        "directory": request.directory
    }

@app.post("/tasks/process-video")
async def process_video(request: VideoProcessRequest):
    """Process video thumbnail generation"""
    task = process_video_thumbnail.send(request.video_url, request.output_bucket)
    
    return {
        "message": "Video processing started",
        "task_id": task.task_id,
        "video_url": request.video_url
    }

@app.get("/tasks/status/{task_id}")
async def get_task_status(task_id: str):
    """Get status of background task"""
    task = hy.get_task(task_id)
    
    return {
        "task_id": task_id,
        "status": task.status,
        "progress": task.progress,
        "result": task.result if task.is_complete else None,
        "created_at": task.created_at,
        "completed_at": task.completed_at
    }

Usage Examples

Trigger Tasks via API

# Backup production database
curl -X POST http://localhost:8000/tasks/backup \
  -H "Content-Type: application/json" \
  -d '{
    "database_name": "production_db",
    "s3_bucket": "company-backups"
  }'

# Clean up old temp files
curl -X POST http://localhost:8000/tasks/cleanup \
  -H "Content-Type: application/json" \
  -d '{
    "directory": "/app/temp",
    "days_old": 7
  }'

# Generate video thumbnail
curl -X POST http://localhost:8000/tasks/process-video \
  -H "Content-Type: application/json" \
  -d '{
    "video_url": "https://example.com/video.mp4",
    "output_bucket": "media-thumbnails"
  }'

# Check task status
curl http://localhost:8000/tasks/status/12345

Monitor Scheduled Tasks

Cron jobs run automatically but you can monitor them through Hyrex Studio:
  • Daily tasks run every morning at 9 AM
  • Weekly tasks run every Sunday at 2 AM
  • View execution history and performance metrics
  • Set up alerts for failed tasks

Cron Schedule Reference

ScheduleDescriptionExample
"0 9 * * *"Daily at 9 AMDaily reports
"0 2 * * 0"Weekly Sunday 2 AMWeekly cleanup
"0 0 1 * *"Monthly 1st at midnightMonthly billing
"*/15 * * * *"Every 15 minutesHealth checks
"0 */6 * * *"Every 6 hoursLog rotation

Production Considerations

  • Backup retention: Implement S3 lifecycle policies to archive old backups
  • Error handling: Set up alerts for failed maintenance tasks
  • Resource usage: Schedule heavy tasks during low-traffic periods
  • Monitoring: Track backup sizes, cleanup counts, and processing times
  • Security: Use IAM roles for S3 access, secure database credentials

Next Steps