from hyrex import HyrexRegistry
from datetime import datetime, timedelta
import requests
import os
hy = HyrexRegistry()
@hy.task
def send_daily_report_email(recipient_email: str, report_data: dict):
"""Send daily analytics report via email"""
date_str = datetime.now().strftime('%Y-%m-%d')
email_content = f"Daily Analytics Report - {date_str}\\n\\n"
email_content += f"Active Users: {report_data['active_users']}\\n"
email_content += f"Revenue: ${report_data['revenue']:,.2f}\\n"
email_content += f"New Signups: {report_data['new_signups']}\\n"
email_content += f"Support Tickets: {report_data['support_tickets']}\\n\\n"
email_content += "View full dashboard: https://yourapp.com/analytics"
send_email(
to=recipient_email,
subject=f"Daily Report - {datetime.now().strftime('%m/%d/%Y')}",
body=email_content
)
@hy.task
def backup_database(database_name: str, s3_bucket: str):
"""Create database backup and upload to S3"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_filename = f"{database_name}_{timestamp}.sql"
# Create database dump
os.system(f"pg_dump {database_name} > /tmp/{backup_filename}")
# Upload to S3
upload_to_s3(f"/tmp/{backup_filename}", s3_bucket, f"backups/{backup_filename}")
# Clean up local file
os.remove(f"/tmp/{backup_filename}")
return {"backup_file": backup_filename, "size_mb": get_file_size_mb(backup_filename)}
@hy.task
def cleanup_old_files(directory: str, days_old: int = 30):
"""Remove files older than specified days"""
cutoff_date = datetime.now() - timedelta(days=days_old)
removed_files = []
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
file_modified = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_modified < cutoff_date:
os.remove(file_path)
removed_files.append(filename)
return {"removed_files": removed_files, "count": len(removed_files)}
@hy.task
def process_video_thumbnail(video_url: str, output_bucket: str):
"""Generate video thumbnail and upload to storage"""
import cv2
import tempfile
# Download video to temp file
video_response = requests.get(video_url)
with tempfile.NamedTemporaryFile(suffix='.mp4') as temp_video:
temp_video.write(video_response.content)
temp_video.flush()
# Extract frame at 2 seconds
cap = cv2.VideoCapture(temp_video.name)
cap.set(cv2.CAP_PROP_POS_MSEC, 2000) # 2 seconds
ret, frame = cap.read()
if ret:
thumbnail_filename = f"thumb_{hash(video_url)}.jpg"
with tempfile.NamedTemporaryFile(suffix='.jpg') as temp_thumb:
cv2.imwrite(temp_thumb.name, frame)
upload_to_s3(temp_thumb.name, output_bucket, f"thumbnails/{thumbnail_filename}")
cap.release()
return {"thumbnail_url": f"s3://{output_bucket}/thumbnails/{thumbnail_filename}"}
@hy.task
def generate_analytics_report():
"""Generate daily analytics data"""
report_data = {
"active_users": get_active_users_count(),
"revenue": get_daily_revenue(),
"new_signups": get_new_signups_count(),
"support_tickets": get_open_tickets_count()
}
# Store in database
store_daily_metrics(report_data)
# Send to stakeholders
stakeholder_emails = ["ceo@company.com", "cto@company.com", "head-of-product@company.com"]
for email in stakeholder_emails:
send_daily_report_email.send(email, report_data)
return report_data
# Schedule recurring tasks
@hy.cron("0 9 * * *") # Every day at 9 AM
def daily_tasks():
"""Run daily maintenance tasks"""
generate_analytics_report.send()
backup_database.send("production_db", "company-backups")
cleanup_old_files.send("/app/temp", 7) # Clean files older than 7 days
@hy.cron("0 2 * * 0") # Every Sunday at 2 AM
def weekly_tasks():
"""Run weekly maintenance tasks"""
cleanup_old_files.send("/app/logs", 30) # Clean logs older than 30 days
backup_database.send("analytics_db", "company-backups")