Background Tasks with Hyrex
Every application needs tasks that run behind the scenes - database backups, report generation, file processing, cleanup jobs. But managing these tasks reliably across different servers and time zones is complex. What if background tasks just worked?
With Hyrex, you can schedule recurring tasks, trigger one-off jobs, and process workloads asynchronously with built-in retry logic and monitoring. Turn unreliable cron jobs and manual processes into robust, observable background workflows that never miss a beat.
Step 1: Define Your Background Tasks
Create Hyrex tasks for all your background operations - from simple file cleanup to complex report generation. These tasks can be triggered on-demand or scheduled to run automatically with cron expressions, complete with retry logic and error handling.
1from hyrex import HyrexRegistry
2from datetime import datetime, timedelta
3import requests
4import os
5
6hy = HyrexRegistry()
7
8@hy.task
9def send_daily_report_email(recipient_email: str, report_data: dict):
10 """Send daily analytics report via email"""
11 date_str = datetime.now().strftime('%Y-%m-%d')
12 email_content = f"Daily Analytics Report - {date_str}\n\n"
13 email_content += f"Active Users: {report_data['active_users']}\n"
14 email_content += f"Revenue: ${report_data['revenue']:,.2f}\n"
15 email_content += f"New Signups: {report_data['new_signups']}\n"
16 email_content += f"Support Tickets: {report_data['support_tickets']}\n\n"
17 email_content += "View full dashboard: https://yourapp.com/analytics"
18
19 send_email(
20 to=recipient_email,
21 subject=f"Daily Report - {datetime.now().strftime('%m/%d/%Y')}",
22 body=email_content
23 )
24
25@hy.task
26def backup_database(database_name: str, s3_bucket: str):
27 """Create database backup and upload to S3"""
28 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
29 backup_filename = f"{database_name}_{timestamp}.sql"
30
31 # Create database dump
32 os.system(f"pg_dump {database_name} > /tmp/{backup_filename}")
33
34 # Upload to S3
35 upload_to_s3(f"/tmp/{backup_filename}", s3_bucket, f"backups/{backup_filename}")
36
37 # Clean up local file
38 os.remove(f"/tmp/{backup_filename}")
39
40 return {"backup_file": backup_filename, "size_mb": get_file_size_mb(backup_filename)}
41
42@hy.task
43def cleanup_old_files(directory: str, days_old: int = 30):
44 """Remove files older than specified days"""
45 cutoff_date = datetime.now() - timedelta(days=days_old)
46 removed_files = []
47
48 for filename in os.listdir(directory):
49 file_path = os.path.join(directory, filename)
50 if os.path.isfile(file_path):
51 file_modified = datetime.fromtimestamp(os.path.getmtime(file_path))
52 if file_modified < cutoff_date:
53 os.remove(file_path)
54 removed_files.append(filename)
55
56 return {"removed_files": removed_files, "count": len(removed_files)}
57
58@hy.task
59def process_video_thumbnail(video_url: str, output_bucket: str):
60 """Generate video thumbnail and upload to storage"""
61 import cv2
62 import tempfile
63
64 # Download video to temp file
65 video_response = requests.get(video_url)
66 with tempfile.NamedTemporaryFile(suffix='.mp4') as temp_video:
67 temp_video.write(video_response.content)
68 temp_video.flush()
69
70 # Extract frame at 2 seconds
71 cap = cv2.VideoCapture(temp_video.name)
72 cap.set(cv2.CAP_PROP_POS_MSEC, 2000) # 2 seconds
73 ret, frame = cap.read()
74
75 if ret:
76 thumbnail_filename = f"thumb_{hash(video_url)}.jpg"
77 with tempfile.NamedTemporaryFile(suffix='.jpg') as temp_thumb:
78 cv2.imwrite(temp_thumb.name, frame)
79 upload_to_s3(temp_thumb.name, output_bucket, f"thumbnails/{thumbnail_filename}")
80
81 cap.release()
82
83 return {"thumbnail_url": f"s3://{output_bucket}/thumbnails/{thumbnail_filename}"}
84
85@hy.task
86def generate_analytics_report():
87 """Generate daily analytics data"""
88 report_data = {
89 "active_users": get_active_users_count(),
90 "revenue": get_daily_revenue(),
91 "new_signups": get_new_signups_count(),
92 "support_tickets": get_open_tickets_count()
93 }
94
95 # Store in database
96 store_daily_metrics(report_data)
97
98 # Send to stakeholders
99 stakeholder_emails = ["ceo@company.com", "cto@company.com", "head-of-product@company.com"]
100 for email in stakeholder_emails:
101 send_daily_report_email.send(email, report_data)
102
103 return report_data
104
105# Schedule recurring tasks
106@hy.cron("0 9 * * *") # Every day at 9 AM
107def daily_tasks():
108 """Run daily maintenance tasks"""
109 generate_analytics_report.send()
110 backup_database.send("production_db", "company-backups")
111 cleanup_old_files.send("/app/temp", 7) # Clean files older than 7 days
112
113@hy.cron("0 2 * * 0") # Every Sunday at 2 AM
114def weekly_tasks():
115 """Run weekly maintenance tasks"""
116 cleanup_old_files.send("/app/logs", 30) # Clean logs older than 30 days
117 backup_database.send("analytics_db", "company-backups")Step 2: Build Task Management APIs
Create endpoints to trigger background tasks on-demand and monitor their progress. These APIs let you integrate background processing into your application workflows and provide visibility into task execution and results.
1from fastapi import FastAPI
2from pydantic import BaseModel
3from .tasks import backup_database, cleanup_old_files, process_video_thumbnail
4
5app = FastAPI()
6
7class BackupRequest(BaseModel):
8 database_name: str
9 s3_bucket: str
10
11class CleanupRequest(BaseModel):
12 directory: str
13 days_old: int = 30
14
15class VideoProcessRequest(BaseModel):
16 video_url: str
17 output_bucket: str
18
19@app.post("/tasks/backup")
20async def trigger_backup(request: BackupRequest):
21 """Trigger database backup"""
22 task = backup_database.send(request.database_name, request.s3_bucket)
23
24 return {
25 "message": "Backup task started",
26 "task_id": task.task_id,
27 "database": request.database_name
28 }
29
30@app.post("/tasks/cleanup")
31async def trigger_cleanup(request: CleanupRequest):
32 """Trigger file cleanup"""
33 task = cleanup_old_files.send(request.directory, request.days_old)
34
35 return {
36 "message": "Cleanup task started",
37 "task_id": task.task_id,
38 "directory": request.directory
39 }
40
41@app.post("/tasks/process-video")
42async def process_video(request: VideoProcessRequest):
43 """Process video thumbnail generation"""
44 task = process_video_thumbnail.send(request.video_url, request.output_bucket)
45
46 return {
47 "message": "Video processing started",
48 "task_id": task.task_id,
49 "video_url": request.video_url
50 }
51
52@app.get("/tasks/status/{task_id}")
53async def get_task_status(task_id: str):
54 """Get status of background task"""
55 task = hy.get_task(task_id)
56
57 return {
58 "task_id": task_id,
59 "status": task.status,
60 "progress": task.progress,
61 "result": task.result if task.is_complete else None,
62 "created_at": task.created_at,
63 "completed_at": task.completed_at
64 }Your background tasks are now bulletproof!
No more wondering if your backup ran last night or if that cleanup job actually executed. Your background tasks are scheduled reliably, run on distributed workers, and provide full observability into their execution status and results.
Scale further with task prioritization, resource-aware scheduling, conditional task execution, or integration with monitoring tools for proactive alerting on task failures.
Explore other use cases
AI-Ready Datasets
Build real-time indexes of your data so AI agents have fresh data.
Document Processing
Parse, transform, and analyze documents with parallel processing pipelines.
Ecommerce Checkout Flows
Handle payment processing, inventory updates, and order fulfillment reliably.
User Onboarding
Orchestrate multi-step user setup flows with email notifications and approvals.