Tasks are the fundamental unit of work in Hyrex. They are Python functions that can be executed asynchronously by workers, with automatic retry handling, monitoring, and durability.

Basic Task Definition

Define a task by decorating a function with @hy.task:
from hyrex import HyrexRegistry
from pydantic import BaseModel

hy = HyrexRegistry()

class ProcessContext(BaseModel):
    file_path: str
    options: dict

@hy.task
def process_file(context: ProcessContext):
    # Task logic
    print(f"Processing {context.file_path}")
    return {"status": "completed", "file": context.file_path}
All task parameters must have type hints. Hyrex enforces this requirement to ensure type safety.

Task Configuration

Configure task behavior with decorator options:
@hy.task(
    queue="processing",          # Target queue
    max_retries=3,              # Retry attempts
    timeout_seconds=300,        # Timeout (5 minutes)
    priority=5,                 # 1-10 (higher = more important)
    retry_backoff=lambda n: n*10 # Backoff strategy
)
def configured_task(data: dict):
    # Process with specific configuration
    return {"processed": True}

Configuration Options

  • queue: Route tasks to specific worker pools
  • max_retries: Number of retry attempts on failure (default: 0)
  • timeout_seconds: Maximum execution time before timeout
  • priority: Task priority from 1-10 (default: 5)
  • retry_backoff: Function to calculate retry delay

Sending Tasks

Queue tasks for asynchronous execution:
# Send with default configuration
task = process_file.send(ProcessContext(
    file_path="/data/input.csv",
    options={"format": "csv"}
))

# Override configuration at runtime
urgent_task = process_file.with_config(
    queue="high-priority",
    max_retries=5
).send(context)

# Get task ID
print(f"Task ID: {task.id}")

Task Context

Access metadata about the current task execution:
from hyrex import get_hyrex_context

@hy.task
def task_with_context(data: dict):
    # Get current task context
    context = get_hyrex_context()
    
    if context:
        task_id = context.task_id
        attempt_number = context.attempt_number
        queue_name = context.queue
        
        print(f"Processing task {task_id} (attempt {attempt_number})")
        
        # Access parent task info if this is a sub-task
        if context.parent_id:
            print(f"Spawned by task: {context.parent_id}")
    
    # Your task logic here
    return {"processed_by": str(task_id) if context else "unknown"}

Error Handling

Tasks can raise exceptions to trigger retries:
@hy.task(max_retries=3)
def task_with_retries(data: dict):
    try:
        # Attempt operation
        result = external_api_call(data)
        return result
    except TemporaryError:
        # This will trigger a retry
        raise
    except PermanentError as e:
        # This will not retry
        return {"error": str(e), "status": "failed"}

Error Callbacks

Use on_error to handle errors without affecting retry behavior:
def error_handler(e: Exception):
    # Log error, send alert, etc.
    print(f"Task error: {type(e).__name__}: {str(e)}")

@hy.task(
    max_retries=3,
    on_error=error_handler
)
def monitored_task(data: dict):
    # Task logic that might fail
    result = risky_operation(data)
    return result

Retry Behavior

  • Raising any exception triggers a retry (if retries remain)
  • Return a value to complete the task (even with errors)
  • Use retry_backoff for exponential or custom backoff
  • on_error callbacks run on each failure but don’t affect retries

Task Results

Track task execution:
# Send a task
task = process_file.send(context)

# Task ID is immediately available
print(f"Task ID: {task.id}")

# Get task name
print(f"Task name: {task.task_name}")

# Refresh task status from database
task.refresh()

# Access task runs (attempts)
for run in task.task_runs:
    print(f"Attempt {run.attempt_number}: {run.status}")
    if run.status == "completed":
        print(f"Result: {run.result}")

Best Practices

  1. Use Type-Safe Contexts
    class OrderContext(BaseModel):
        order_id: int
        customer_email: str
        items: List[dict]
    
  2. Set Appropriate Timeouts
    @hy.task(timeout_seconds=30)  # Fast operations
    @hy.task(timeout_seconds=3600)  # Long-running jobs
    
  3. Design for Idempotency
    @hy.task
    def process_payment(context: PaymentContext):
        # Check if already processed
        if payment_exists(context.payment_id):
            return get_payment_result(context.payment_id)
        
        # Process payment
        result = charge_card(context)
        save_payment_result(context.payment_id, result)
        return result
    
  4. Handle Errors Gracefully
    @hy.task(max_retries=3)
    def fetch_data(context: FetchContext):
        try:
            return fetch_from_api(context.url)
        except RateLimitError:
            # Retry with backoff
            raise
        except InvalidDataError as e:
            # Don't retry, return error
            return {"error": str(e), "status": "invalid_data"}
    

Next Steps