Documentation Index Fetch the complete documentation index at: https://hyrex.io/docs/llms.txt
Use this file to discover all available pages before exploring further.
Monitor task execution, track performance, and debug issues using Hyrex’s built-in monitoring capabilities.
Hyrex Cloud Dashboard Access your monitoring dashboard at hyrex.io/cloud . Hyrex Studio Web UI for monitoring tasks and workflows. Start Studio hyrex studio
# Custom port
hyrex studio --port 8080
# Verbose logging
hyrex studio --verbose
Access at https://local.hyrex.studio Features
Task status and history
Queue depths
Worker health
Error tracking
Workflow visualization
Environment Variables # Required
HYREX_DATABASE_URL = postgresql://user:pass@localhost/hyrex
# Optional
STUDIO_PORT = 1337
STUDIO_VERBOSE = true
Programmatic Monitoring
Task Context
Access execution context within tasks:
from hyrex import get_hyrex_context
@hy.task ( max_retries = 3 )
def monitored_task ( data : dict ):
context = get_hyrex_context()
if context:
# Log task metadata
print ( f "Task ID: { context.task_id } " )
print ( f "Task Name: { context.task_name } " )
print ( f "Queue: { context.queue } " )
print ( f "Attempt: { context.attempt_number } of { context.max_retries + 1 } " )
print ( f "Started: { context.started } " )
# Track parent-child relationships
if context.parent_id:
print ( f "Spawned by: { context.parent_id } " )
# Monitor workflow execution
if context.workflow_run_id:
print ( f "Part of workflow: { context.workflow_run_id } " )
# Your task logic
return process(data)
Workflow Context
Monitor workflow execution and task dependencies:
from hyrex import get_hyrex_workflow_context
@hy.task
def workflow_monitor ():
context = get_hyrex_workflow_context()
if context:
print ( f "Workflow Run ID: { context.workflow_run_id } " )
# Check other task statuses
for task_name, durable_run in context.durable_runs.items():
durable_run.refresh()
for run in durable_run.task_runs:
print ( f " { task_name } : { run.status } (attempt { run.attempt_number } )" )
if run.status == "failed" :
print ( f " Error: { run.error } " )
elif run.status == "completed" :
print ( f " Result: { run.result } " )
Task Tracking
Track task execution after sending:
# Send task and track execution
task = process_data.send({ "file" : "data.csv" })
print ( f "Task ID: { task.id } " )
# Refresh to get latest status
task.refresh()
# Check all attempts
for run in task.task_runs:
print ( f "Attempt { run.attempt_number } :" )
print ( f " Status: { run.status } " )
print ( f " Started: { run.started } " )
print ( f " Completed: { run.completed } " )
if run.status == "completed" :
print ( f " Result: { run.result } " )
elif run.status == "failed" :
print ( f " Error: { run.error } " )
Custom Metrics
Use HyrexKV to track custom metrics:
from hyrex import HyrexKV
import json
from datetime import datetime
@hy.task
def task_with_metrics ( data : dict ):
start_time = datetime.now()
try :
# Process data
result = process(data)
# Track success metrics
metrics = {
"status" : "success" ,
"duration" : (datetime.now() - start_time).total_seconds(),
"timestamp" : datetime.now().isoformat(),
"items_processed" : len (result)
}
except Exception as e:
# Track failure metrics
metrics = {
"status" : "error" ,
"error" : str (e),
"duration" : (datetime.now() - start_time).total_seconds(),
"timestamp" : datetime.now().isoformat()
}
raise
finally :
# Store metrics
task_name = get_hyrex_context().task_name if get_hyrex_context() else "unknown"
HyrexKV.set( f "metrics: { task_name } : { datetime.now().timestamp() } " , json.dumps(metrics))
return result
Error Tracking
Implement comprehensive error tracking:
def error_tracker ( e : Exception ):
"""Track errors for monitoring"""
context = get_hyrex_context()
if context:
error_data = {
"task_id" : str (context.task_id),
"task_name" : context.task_name,
"error_type" : type (e). __name__ ,
"error_message" : str (e),
"attempt" : context.attempt_number,
"timestamp" : datetime.now().isoformat()
}
# Store error for analysis
HyrexKV.set(
f "errors: { context.task_name } : { context.task_id } " ,
json.dumps(error_data)
)
# Alert on final retry
if context.attempt_number == context.max_retries:
send_alert( f "Task { context.task_name } failed after all retries: { str (e) } " )
@hy.task (
max_retries = 3 ,
on_error = error_tracker
)
def reliable_task ( data : dict ):
return process_with_monitoring(data)
Best Practices
Use structured logging with task context for easier debugging
Track custom metrics for business-specific monitoring
Set up alerts for critical task failures
Monitor queue depths to identify bottlenecks
Track task duration to identify performance issues
Use correlation IDs to trace requests across tasks
Next Steps
Retries Configure retry strategies
Examples See monitoring patterns in action