Background Tasks with Hyrex

Every application needs tasks that run behind the scenes - database backups, report generation, file processing, cleanup jobs. But managing these tasks reliably across different servers and time zones is complex. What if background tasks just worked?

With Hyrex, you can schedule recurring tasks, trigger one-off jobs, and process workloads asynchronously with built-in retry logic and monitoring. Turn unreliable cron jobs and manual processes into robust, observable background workflows that never miss a beat.

Background Tasks Scheduler

Step 1: Define Your Background Tasks

Create Hyrex tasks for all your background operations - from simple file cleanup to complex report generation. These tasks can be triggered on-demand or scheduled to run automatically with cron expressions, complete with retry logic and error handling.

src/hyrex/background_tasks.py
1from hyrex import HyrexRegistry
2from datetime import datetime, timedelta
3import requests
4import os
5
6hy = HyrexRegistry()
7
8@hy.task
9def send_daily_report_email(recipient_email: str, report_data: dict):
10    """Send daily analytics report via email"""
11    date_str = datetime.now().strftime('%Y-%m-%d')
12    email_content = f"Daily Analytics Report - {date_str}\n\n"
13    email_content += f"Active Users: {report_data['active_users']}\n"
14    email_content += f"Revenue: ${report_data['revenue']:,.2f}\n"
15    email_content += f"New Signups: {report_data['new_signups']}\n"
16    email_content += f"Support Tickets: {report_data['support_tickets']}\n\n"
17    email_content += "View full dashboard: https://yourapp.com/analytics"
18
19    send_email(
20        to=recipient_email,
21        subject=f"Daily Report - {datetime.now().strftime('%m/%d/%Y')}",
22        body=email_content
23    )
24
25@hy.task
26def backup_database(database_name: str, s3_bucket: str):
27    """Create database backup and upload to S3"""
28    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
29    backup_filename = f"{database_name}_{timestamp}.sql"
30
31    # Create database dump
32    os.system(f"pg_dump {database_name} > /tmp/{backup_filename}")
33
34    # Upload to S3
35    upload_to_s3(f"/tmp/{backup_filename}", s3_bucket, f"backups/{backup_filename}")
36
37    # Clean up local file
38    os.remove(f"/tmp/{backup_filename}")
39
40    return {"backup_file": backup_filename, "size_mb": get_file_size_mb(backup_filename)}
41
42@hy.task
43def cleanup_old_files(directory: str, days_old: int = 30):
44    """Remove files older than specified days"""
45    cutoff_date = datetime.now() - timedelta(days=days_old)
46    removed_files = []
47
48    for filename in os.listdir(directory):
49        file_path = os.path.join(directory, filename)
50        if os.path.isfile(file_path):
51            file_modified = datetime.fromtimestamp(os.path.getmtime(file_path))
52            if file_modified < cutoff_date:
53                os.remove(file_path)
54                removed_files.append(filename)
55
56    return {"removed_files": removed_files, "count": len(removed_files)}
57
58@hy.task
59def process_video_thumbnail(video_url: str, output_bucket: str):
60    """Generate video thumbnail and upload to storage"""
61    import cv2
62    import tempfile
63
64    # Download video to temp file
65    video_response = requests.get(video_url)
66    with tempfile.NamedTemporaryFile(suffix='.mp4') as temp_video:
67        temp_video.write(video_response.content)
68        temp_video.flush()
69
70        # Extract frame at 2 seconds
71        cap = cv2.VideoCapture(temp_video.name)
72        cap.set(cv2.CAP_PROP_POS_MSEC, 2000)  # 2 seconds
73        ret, frame = cap.read()
74
75        if ret:
76            thumbnail_filename = f"thumb_{hash(video_url)}.jpg"
77            with tempfile.NamedTemporaryFile(suffix='.jpg') as temp_thumb:
78                cv2.imwrite(temp_thumb.name, frame)
79                upload_to_s3(temp_thumb.name, output_bucket, f"thumbnails/{thumbnail_filename}")
80
81        cap.release()
82
83    return {"thumbnail_url": f"s3://{output_bucket}/thumbnails/{thumbnail_filename}"}
84
85@hy.task
86def generate_analytics_report():
87    """Generate daily analytics data"""
88    report_data = {
89        "active_users": get_active_users_count(),
90        "revenue": get_daily_revenue(),
91        "new_signups": get_new_signups_count(),
92        "support_tickets": get_open_tickets_count()
93    }
94
95    # Store in database
96    store_daily_metrics(report_data)
97
98    # Send to stakeholders
99    stakeholder_emails = ["ceo@company.com", "cto@company.com", "head-of-product@company.com"]
100    for email in stakeholder_emails:
101        send_daily_report_email.send(email, report_data)
102
103    return report_data
104
105# Schedule recurring tasks
106@hy.cron("0 9 * * *")  # Every day at 9 AM
107def daily_tasks():
108    """Run daily maintenance tasks"""
109    generate_analytics_report.send()
110    backup_database.send("production_db", "company-backups")
111    cleanup_old_files.send("/app/temp", 7)  # Clean files older than 7 days
112
113@hy.cron("0 2 * * 0")  # Every Sunday at 2 AM
114def weekly_tasks():
115    """Run weekly maintenance tasks"""
116    cleanup_old_files.send("/app/logs", 30)  # Clean logs older than 30 days
117    backup_database.send("analytics_db", "company-backups")

Step 2: Build Task Management APIs

Create endpoints to trigger background tasks on-demand and monitor their progress. These APIs let you integrate background processing into your application workflows and provide visibility into task execution and results.

src/routes/tasks_api.py
1from fastapi import FastAPI
2from pydantic import BaseModel
3from .tasks import backup_database, cleanup_old_files, process_video_thumbnail
4
5app = FastAPI()
6
7class BackupRequest(BaseModel):
8    database_name: str
9    s3_bucket: str
10
11class CleanupRequest(BaseModel):
12    directory: str
13    days_old: int = 30
14
15class VideoProcessRequest(BaseModel):
16    video_url: str
17    output_bucket: str
18
19@app.post("/tasks/backup")
20async def trigger_backup(request: BackupRequest):
21    """Trigger database backup"""
22    task = backup_database.send(request.database_name, request.s3_bucket)
23
24    return {
25        "message": "Backup task started",
26        "task_id": task.task_id,
27        "database": request.database_name
28    }
29
30@app.post("/tasks/cleanup")
31async def trigger_cleanup(request: CleanupRequest):
32    """Trigger file cleanup"""
33    task = cleanup_old_files.send(request.directory, request.days_old)
34
35    return {
36        "message": "Cleanup task started",
37        "task_id": task.task_id,
38        "directory": request.directory
39    }
40
41@app.post("/tasks/process-video")
42async def process_video(request: VideoProcessRequest):
43    """Process video thumbnail generation"""
44    task = process_video_thumbnail.send(request.video_url, request.output_bucket)
45
46    return {
47        "message": "Video processing started",
48        "task_id": task.task_id,
49        "video_url": request.video_url
50    }
51
52@app.get("/tasks/status/{task_id}")
53async def get_task_status(task_id: str):
54    """Get status of background task"""
55    task = hy.get_task(task_id)
56
57    return {
58        "task_id": task_id,
59        "status": task.status,
60        "progress": task.progress,
61        "result": task.result if task.is_complete else None,
62        "created_at": task.created_at,
63        "completed_at": task.completed_at
64    }

Your background tasks are now bulletproof!

No more wondering if your backup ran last night or if that cleanup job actually executed. Your background tasks are scheduled reliably, run on distributed workers, and provide full observability into their execution status and results.

Scale further with task prioritization, resource-aware scheduling, conditional task execution, or integration with monitoring tools for proactive alerting on task failures.