Schedule recurring tasks
from hyrex import HyrexRegistry from datetime import datetime hy = HyrexRegistry() @hy.task(cron="0 2 * * *") # Daily at 2 AM def daily_cleanup(): # Clean up old files deleted_count = clean_temp_files() return { "cleaned": True, "deleted_files": deleted_count, "timestamp": datetime.now().isoformat() } @hy.task(cron="*/5 * * * *") # Every 5 minutes def health_check(): # Check system health status = check_system_health() return {"status": status, "checked_at": datetime.now().isoformat()}
┌───────────── minute (0-59) │ ┌───────────── hour (0-23) │ │ ┌───────────── day of month (1-31) │ │ │ ┌───────────── month (1-12) │ │ │ │ ┌───────────── day of week (0-6) │ │ │ │ │ * * * * *
# Every hour on the hour @hy.task(cron="0 * * * *") def hourly_sync(): sync_data() # Daily at midnight @hy.task(cron="0 0 * * *") def daily_report(): generate_report() # Weekly on Sunday at midnight @hy.task(cron="0 0 * * 0") def weekly_backup(): backup_database() # Every 15 minutes @hy.task(cron="*/15 * * * *") def frequent_check(): check_metrics() # Weekdays at 9 AM @hy.task(cron="0 9 * * 1-5") def business_hours_task(): send_daily_digest() # First day of month at noon @hy.task(cron="0 12 1 * *") def monthly_billing(): process_invoices()
# ✅ Valid - no arguments @hy.task(cron="0 * * * *") def no_args(): process_hourly_data() return {"processed": True} # ✅ Valid - all arguments have defaults @hy.task(cron="0 0 * * *") def with_defaults(days: int = 30, dry_run: bool = False): cleanup_old_files(days, dry_run) return {"days": days, "dry_run": dry_run} # ❌ Invalid - has required argument @hy.task(cron="0 0 * * *") def invalid(required_arg: str): # Will raise ValueError # This will fail at task registration time process(required_arg)
from pydantic import BaseModel class BackupConfig(BaseModel): retention_days: int = 30 compression: str = "gzip" @hy.task def backup_database(): # Create database backup return {"backup_file": "/tmp/backup.sql"} @hy.task def compress_backup(): # Compress the backup file return {"compressed_file": "/tmp/backup.sql.gz"} @hy.task def upload_to_s3(): # Upload to S3 return {"s3_url": "s3://backups/backup.sql.gz"} @hy.workflow( cron="0 3 * * 0", # Every Sunday at 3 AM workflow_arg_schema=BackupConfig ) def weekly_backup_workflow(): # Define workflow DAG backup_database >> compress_backup >> upload_to_s3
Was this page helpful?