Tasks are the fundamental unit of work in Hyrex. They are Python functions that can be executed asynchronously by workers, with automatic retry handling, monitoring, and durability.
from hyrex import get_hyrex_context@hy.taskdef task_with_context(data: dict): # Get current task context context = get_hyrex_context() if context: task_id = context.task_id attempt_number = context.attempt_number queue_name = context.queue print(f"Processing task {task_id} (attempt {attempt_number})") # Access parent task info if this is a sub-task if context.parent_id: print(f"Spawned by task: {context.parent_id}") # Your task logic here return {"processed_by": str(task_id) if context else "unknown"}
@hy.task(max_retries=3)def task_with_retries(data: dict): try: # Attempt operation result = external_api_call(data) return result except TemporaryError: # This will trigger a retry raise except PermanentError as e: # This will not retry return {"error": str(e), "status": "failed"}
# Send a tasktask = process_file.send(context)# Task ID is immediately availableprint(f"Task ID: {task.id}")# Get task nameprint(f"Task name: {task.task_name}")# Refresh task status from databasetask.refresh()# Access task runs (attempts)for run in task.task_runs: print(f"Attempt {run.attempt_number}: {run.status}") if run.status == "completed": print(f"Result: {run.result}")
class OrderContext(BaseModel): order_id: int customer_email: str items: List[dict]
Set Appropriate Timeouts
Copy
Ask AI
@hy.task(timeout_seconds=30) # Fast operations@hy.task(timeout_seconds=3600) # Long-running jobs
Design for Idempotency
Copy
Ask AI
@hy.taskdef process_payment(context: PaymentContext): # Check if already processed if payment_exists(context.payment_id): return get_payment_result(context.payment_id) # Process payment result = charge_card(context) save_payment_result(context.payment_id, result) return result