Monitor task execution, track performance, and debug issues using Hyrex’s built-in monitoring capabilities.

Hyrex Cloud Dashboard

Access your monitoring dashboard at hyrex.io/cloud.

Programmatic Monitoring

Task Context

Access execution context within tasks:
from hyrex import get_hyrex_context

@hy.task(max_retries=3)
def monitored_task(data: dict):
    context = get_hyrex_context()
    
    if context:
        # Log task metadata
        print(f"Task ID: {context.task_id}")
        print(f"Task Name: {context.task_name}")
        print(f"Queue: {context.queue}")
        print(f"Attempt: {context.attempt_number} of {context.max_retries + 1}")
        print(f"Started: {context.started}")
        
        # Track parent-child relationships
        if context.parent_id:
            print(f"Spawned by: {context.parent_id}")
        
        # Monitor workflow execution
        if context.workflow_run_id:
            print(f"Part of workflow: {context.workflow_run_id}")
    
    # Your task logic
    return process(data)

Workflow Context

Monitor workflow execution and task dependencies:
from hyrex import get_hyrex_workflow_context

@hy.task
def workflow_monitor():
    context = get_hyrex_workflow_context()
    
    if context:
        print(f"Workflow Run ID: {context.workflow_run_id}")
        
        # Check other task statuses
        for task_name, durable_run in context.durable_runs.items():
            durable_run.refresh()
            
            for run in durable_run.task_runs:
                print(f"{task_name}: {run.status} (attempt {run.attempt_number})")
                
                if run.status == "failed":
                    print(f"  Error: {run.error}")
                elif run.status == "completed":
                    print(f"  Result: {run.result}")

Task Tracking

Track task execution after sending:
# Send task and track execution
task = process_data.send({"file": "data.csv"})
print(f"Task ID: {task.id}")

# Refresh to get latest status
task.refresh()

# Check all attempts
for run in task.task_runs:
    print(f"Attempt {run.attempt_number}:")
    print(f"  Status: {run.status}")
    print(f"  Started: {run.started}")
    print(f"  Completed: {run.completed}")
    
    if run.status == "completed":
        print(f"  Result: {run.result}")
    elif run.status == "failed":
        print(f"  Error: {run.error}")

Custom Metrics

Use HyrexKV to track custom metrics:
from hyrex import HyrexKV
import json
from datetime import datetime

@hy.task
def task_with_metrics(data: dict):
    start_time = datetime.now()
    
    try:
        # Process data
        result = process(data)
        
        # Track success metrics
        metrics = {
            "status": "success",
            "duration": (datetime.now() - start_time).total_seconds(),
            "timestamp": datetime.now().isoformat(),
            "items_processed": len(result)
        }
        
    except Exception as e:
        # Track failure metrics
        metrics = {
            "status": "error",
            "error": str(e),
            "duration": (datetime.now() - start_time).total_seconds(),
            "timestamp": datetime.now().isoformat()
        }
        raise
    
    finally:
        # Store metrics
        task_name = get_hyrex_context().task_name if get_hyrex_context() else "unknown"
        HyrexKV.set(f"metrics:{task_name}:{datetime.now().timestamp()}", json.dumps(metrics))
    
    return result

Error Tracking

Implement comprehensive error tracking:
def error_tracker(e: Exception):
    """Track errors for monitoring"""
    context = get_hyrex_context()
    
    if context:
        error_data = {
            "task_id": str(context.task_id),
            "task_name": context.task_name,
            "error_type": type(e).__name__,
            "error_message": str(e),
            "attempt": context.attempt_number,
            "timestamp": datetime.now().isoformat()
        }
        
        # Store error for analysis
        HyrexKV.set(
            f"errors:{context.task_name}:{context.task_id}",
            json.dumps(error_data)
        )
        
        # Alert on final retry
        if context.attempt_number == context.max_retries:
            send_alert(f"Task {context.task_name} failed after all retries: {str(e)}")

@hy.task(
    max_retries=3,
    on_error=error_tracker
)
def reliable_task(data: dict):
    return process_with_monitoring(data)

Best Practices

  1. Use structured logging with task context for easier debugging
  2. Track custom metrics for business-specific monitoring
  3. Set up alerts for critical task failures
  4. Monitor queue depths to identify bottlenecks
  5. Track task duration to identify performance issues
  6. Use correlation IDs to trace requests across tasks

Next Steps