Skip to main content

Documentation Index

Fetch the complete documentation index at: https://hyrex.io/docs/llms.txt

Use this file to discover all available pages before exploring further.

Build resilient systems with comprehensive error handling, intelligent retry mechanisms, circuit breakers, and detailed error monitoring.

Overview

This example demonstrates robust error handling patterns:
  • Comprehensive error classification - Differentiate between retriable and non-retriable errors
  • Intelligent retry strategies - Exponential backoff, circuit breakers, and custom retry logic
  • Error monitoring & alerting - Track error rates, patterns, and system health
  • Graceful degradation - Fallback mechanisms when services are unavailable
  • Error aggregation - Collect and analyze error patterns for improvement
  • Recovery workflows - Automatic and manual recovery processes
Perfect for building fault-tolerant systems, API integrations, or any application requiring high reliability.

Task Definitions

from hyrex import HyrexRegistry, HyrexKV, get_hyrex_context
from pydantic import BaseModel
from typing import Dict, Any, Optional, List
import traceback
import logging
import time
import json
from datetime import datetime, timedelta
from enum import Enum

hy = HyrexRegistry()
logger = logging.getLogger(__name__)

class ErrorSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"  
    HIGH = "high"
    CRITICAL = "critical"

class ErrorCategory(Enum):
    VALIDATION = "validation"
    NETWORK = "network"
    DATABASE = "database"
    EXTERNAL_API = "external_api"
    BUSINESS_LOGIC = "business_logic"
    SYSTEM = "system"

class MonitoredTaskContext(BaseModel):
    operation: str
    data: Dict[str, Any]
    retry_config: Optional[Dict[str, Any]] = None
    circuit_breaker_key: Optional[str] = None

class TaskError(Exception):
    def __init__(self, message: str, category: ErrorCategory, severity: ErrorSeverity, 
                 retriable: bool = True, details: Optional[Dict] = None):
        super().__init__(message)
        self.category = category
        self.severity = severity
        self.retriable = retriable
        self.details = details or {}

class ValidationError(TaskError):
    def __init__(self, message: str, details: Optional[Dict] = None):
        super().__init__(message, ErrorCategory.VALIDATION, ErrorSeverity.MEDIUM, False, details)

class ExternalAPIError(TaskError):
    def __init__(self, message: str, status_code: int, details: Optional[Dict] = None):
        severity = ErrorSeverity.HIGH if status_code >= 500 else ErrorSeverity.MEDIUM
        retriable = status_code >= 500 or status_code in [429, 408]  # Server errors and rate limits
        super().__init__(message, ErrorCategory.EXTERNAL_API, severity, retriable, 
                        {**(details or {}), "status_code": status_code})

@hy.task(max_retries=5, retry_backoff=True)
def monitored_task(context: MonitoredTaskContext):
    """Task with comprehensive error handling and monitoring"""
    
    start_time = time.time()
    task_context = get_hyrex_context()
    operation = context.operation
    
    # Check circuit breaker if configured
    if context.circuit_breaker_key:
        circuit_state = check_circuit_breaker(context.circuit_breaker_key)
        if circuit_state == "OPEN":
            raise TaskError(
                f"Circuit breaker OPEN for {context.circuit_breaker_key}",
                ErrorCategory.SYSTEM,
                ErrorSeverity.HIGH,
                retriable=False
            )
    
    try:
        # Track task execution metrics
        update_task_metrics(operation, "started")
        
        # Perform the actual operation
        result = perform_operation(context.operation, context.data)
        
        # Record success metrics
        duration = time.time() - start_time
        record_task_success(operation, duration, task_context)
        
        # Reset circuit breaker on success
        if context.circuit_breaker_key:
            reset_circuit_breaker(context.circuit_breaker_key)
        
        return {
            "success": True,
            "result": result,
            "duration": duration,
            "operation": operation
        }
        
    except ValidationError as e:
        # Don't retry validation errors - log and return error response
        error_details = log_task_error(operation, e, task_context, retriable=False)
        return {
            "success": False,
            "error_type": "validation_error",
            "error_message": str(e),
            "details": e.details,
            "error_id": error_details["error_id"]
        }
        
    except ExternalAPIError as e:
        # Handle external API errors with circuit breaker logic
        if context.circuit_breaker_key:
            record_circuit_breaker_failure(context.circuit_breaker_key)
        
        error_details = log_task_error(operation, e, task_context, retriable=e.retriable)
        
        if e.retriable:
            # Let Hyrex handle the retry
            raise e
        else:
            return {
                "success": False,
                "error_type": "external_api_error", 
                "error_message": str(e),
                "status_code": e.details.get("status_code"),
                "error_id": error_details["error_id"]
            }
            
    except Exception as e:
        # Handle unexpected errors
        error_details = log_task_error(operation, e, task_context, retriable=True)
        
        # Record for circuit breaker if configured
        if context.circuit_breaker_key:
            record_circuit_breaker_failure(context.circuit_breaker_key)
        
        # Send critical error alert
        if isinstance(e, TaskError) and e.severity == ErrorSeverity.CRITICAL:
            send_critical_error_alert.send(operation, str(e), error_details)
        
        raise e  # Let Hyrex handle retries

@hy.task(max_retries=3)
def resilient_external_api_call(url: str, method: str = "GET", data: Optional[Dict] = None, 
                               timeout: int = 30, circuit_breaker_key: Optional[str] = None):
    """Make external API call with comprehensive error handling"""
    
    import requests
    from requests.adapters import HTTPAdapter
    from requests.packages.urllib3.util.retry import Retry
    
    # Check circuit breaker
    if circuit_breaker_key:
        circuit_state = check_circuit_breaker(circuit_breaker_key)
        if circuit_state == "OPEN":
            raise ExternalAPIError("Service unavailable - circuit breaker OPEN", 503)
    
    # Configure requests with retry strategy
    session = requests.Session()
    retry_strategy = Retry(
        total=3,
        status_forcelist=[429, 500, 502, 503, 504],
        method_whitelist=["HEAD", "GET", "OPTIONS"],
        backoff_factor=1
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    try:
        response = session.request(
            method=method,
            url=url,
            json=data,
            timeout=timeout,
            headers={"User-Agent": "HyrexApp/1.0"}
        )
        
        # Check for HTTP errors
        if response.status_code >= 400:
            raise ExternalAPIError(
                f"API request failed with status {response.status_code}",
                response.status_code,
                {"url": url, "response_body": response.text[:1000]}
            )
        
        # Reset circuit breaker on success
        if circuit_breaker_key:
            reset_circuit_breaker(circuit_breaker_key)
        
        return {
            "status_code": response.status_code,
            "data": response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text,
            "headers": dict(response.headers)
        }
        
    except requests.exceptions.Timeout:
        if circuit_breaker_key:
            record_circuit_breaker_failure(circuit_breaker_key)
        raise ExternalAPIError("Request timeout", 408, {"url": url, "timeout": timeout})
    
    except requests.exceptions.ConnectionError as e:
        if circuit_breaker_key:
            record_circuit_breaker_failure(circuit_breaker_key)
        raise ExternalAPIError("Connection failed", 503, {"url": url, "error": str(e)})
    
    except requests.exceptions.RequestException as e:
        if circuit_breaker_key:
            record_circuit_breaker_failure(circuit_breaker_key)
        raise ExternalAPIError(f"Request failed: {str(e)}", 500, {"url": url})

@hy.task
def error_recovery_workflow(error_id: str, recovery_strategy: str):
    """Handle error recovery workflows"""
    
    # Get error details
    error_details = get_error_details(error_id)
    if not error_details:
        raise ValueError(f"Error {error_id} not found")
    
    try:
        if recovery_strategy == "retry_with_fallback":
            # Retry original operation with fallback data
            fallback_data = get_fallback_data(error_details["operation"])
            result = perform_operation(error_details["operation"], fallback_data)
            
        elif recovery_strategy == "manual_intervention":
            # Create ticket for manual intervention
            ticket_id = create_support_ticket(error_details)
            result = {"ticket_id": ticket_id, "status": "manual_intervention_required"}
            
        elif recovery_strategy == "alternative_workflow":
            # Execute alternative workflow
            result = execute_alternative_workflow(error_details)
            
        else:
            raise ValueError(f"Unknown recovery strategy: {recovery_strategy}")
        
        # Mark error as recovered
        mark_error_recovered(error_id, recovery_strategy, result)
        
        return {
            "error_id": error_id,
            "recovery_strategy": recovery_strategy,
            "recovered": True,
            "result": result
        }
        
    except Exception as e:
        # Recovery failed
        mark_recovery_failed(error_id, recovery_strategy, str(e))
        raise TaskError(
            f"Recovery failed for error {error_id}: {str(e)}",
            ErrorCategory.SYSTEM,
            ErrorSeverity.HIGH
        )

def log_task_error(operation: str, error: Exception, task_context: Any, retriable: bool = True) -> Dict:
    """Log detailed error information"""
    
    error_id = f"err_{int(time.time())}_{hash(str(error)) % 10000}"
    
    error_data = {
        "error_id": error_id,
        "operation": operation,
        "error_type": type(error).__name__,
        "error_message": str(error),
        "retriable": retriable,
        "timestamp": datetime.now().isoformat(),
        "task_id": getattr(task_context, 'task_id', 'unknown'),
        "attempt_number": getattr(task_context, 'attempt_number', 0),
        "traceback": traceback.format_exc()
    }
    
    # Add error-specific details
    if isinstance(error, TaskError):
        error_data.update({
            "category": error.category.value,
            "severity": error.severity.value,
            "details": error.details
        })
    
    # Store error for analysis
    HyrexKV.set(
        f"error:{error_id}",
        json.dumps(error_data),
        expiry_seconds=604800  # 7 days
    )
    
    # Update error metrics
    update_error_metrics(operation, error_data)
    
    # Log to application logs
    logger.error(f"Task error in {operation}: {error_data}")
    
    return error_data

def update_error_metrics(operation: str, error_data: Dict):
    """Update error tracking metrics"""
    
    # Update error counts
    error_key = f"error_metrics:{operation}"
    
    try:
        metrics_data = HyrexKV.get(error_key)
        metrics = json.loads(metrics_data) if metrics_data else {
            "total_errors": 0,
            "error_types": {},
            "error_categories": {},
            "hourly_counts": {}
        }
    except:
        metrics = {"total_errors": 0, "error_types": {}, "error_categories": {}, "hourly_counts": {}}
    
    # Update counters
    metrics["total_errors"] += 1
    error_type = error_data["error_type"]
    metrics["error_types"][error_type] = metrics["error_types"].get(error_type, 0) + 1
    
    if "category" in error_data:
        category = error_data["category"]
        metrics["error_categories"][category] = metrics["error_categories"].get(category, 0) + 1
    
    # Update hourly counts for trend analysis
    hour_key = datetime.now().strftime("%Y-%m-%d-%H")
    metrics["hourly_counts"][hour_key] = metrics["hourly_counts"].get(hour_key, 0) + 1
    
    # Store updated metrics
    HyrexKV.set(error_key, json.dumps(metrics), expiry_seconds=2592000)  # 30 days

def check_circuit_breaker(key: str) -> str:
    """Check circuit breaker state"""
    
    cb_key = f"circuit_breaker:{key}"
    
    try:
        cb_data = HyrexKV.get(cb_key)
        if not cb_data:
            return "CLOSED"
        
        cb_state = json.loads(cb_data)
        
        # Check if circuit breaker should be reset
        if cb_state["state"] == "OPEN":
            if time.time() - cb_state["opened_at"] > cb_state.get("timeout", 60):
                cb_state["state"] = "HALF_OPEN"
                HyrexKV.set(cb_key, json.dumps(cb_state), expiry_seconds=3600)
        
        return cb_state["state"]
        
    except:
        return "CLOSED"

def record_circuit_breaker_failure(key: str):
    """Record a failure for circuit breaker logic"""
    
    cb_key = f"circuit_breaker:{key}"
    
    try:
        cb_data = HyrexKV.get(cb_key)
        if cb_data:
            cb_state = json.loads(cb_data)
        else:
            cb_state = {
                "failures": 0,
                "state": "CLOSED",
                "threshold": 5,
                "timeout": 60
            }
        
        cb_state["failures"] += 1
        cb_state["last_failure"] = time.time()
        
        # Open circuit breaker if threshold exceeded
        if cb_state["failures"] >= cb_state["threshold"] and cb_state["state"] != "OPEN":
            cb_state["state"] = "OPEN"
            cb_state["opened_at"] = time.time()
            
            # Send circuit breaker alert
            send_circuit_breaker_alert.send(key, cb_state)
        
        HyrexKV.set(cb_key, json.dumps(cb_state), expiry_seconds=3600)
        
    except Exception as e:
        logger.error(f"Failed to update circuit breaker {key}: {e}")

def reset_circuit_breaker(key: str):
    """Reset circuit breaker on successful operation"""
    
    cb_key = f"circuit_breaker:{key}"
    
    try:
        cb_data = HyrexKV.get(cb_key)
        if cb_data:
            cb_state = json.loads(cb_data)
            cb_state["failures"] = 0
            cb_state["state"] = "CLOSED"
            HyrexKV.set(cb_key, json.dumps(cb_state), expiry_seconds=3600)
    except:
        pass

@hy.task
def send_critical_error_alert(operation: str, error_message: str, error_details: Dict):
    """Send alert for critical errors"""
    
    alert_message = f"""
    🚨 CRITICAL ERROR ALERT 🚨
    
    Operation: {operation}
    Error: {error_message}
    Error ID: {error_details.get('error_id')}
    Task ID: {error_details.get('task_id')}
    Time: {error_details.get('timestamp')}
    
    Immediate attention required!
    """
    
    # Send to multiple channels
    send_slack_alert(alert_message, channel="#alerts")
    send_email_alert("Critical System Error", alert_message, ["oncall@company.com"])
    
    return {"alert_sent": True, "error_id": error_details.get('error_id')}

@hy.task
def send_circuit_breaker_alert(key: str, cb_state: Dict):
    """Send alert when circuit breaker opens"""
    
    alert_message = f"""
    ⚡ Circuit Breaker OPEN ⚡
    
    Service: {key}
    Failure Count: {cb_state['failures']}
    Threshold: {cb_state['threshold']}
    Opened At: {datetime.fromtimestamp(cb_state['opened_at']).isoformat()}
    
    Service calls are being blocked to prevent cascade failures.
    """
    
    send_slack_alert(alert_message, channel="#infrastructure")
    
    return {"alert_sent": True, "circuit_breaker": key}

@hy.task
def generate_error_report(operation: Optional[str] = None, hours: int = 24):
    """Generate comprehensive error analysis report"""
    
    end_time = datetime.now()
    start_time = end_time - timedelta(hours=hours)
    
    # Collect error data
    if operation:
        operations = [operation]
    else:
        operations = get_all_monitored_operations()
    
    report_data = {
        "period": {
            "start": start_time.isoformat(),
            "end": end_time.isoformat(),
            "hours": hours
        },
        "operations": {},
        "summary": {
            "total_errors": 0,
            "most_common_errors": {},
            "error_trends": {},
            "recovery_stats": {}
        }
    }
    
    for op in operations:
        error_metrics = get_error_metrics(op, start_time, end_time)
        if error_metrics:
            report_data["operations"][op] = error_metrics
            report_data["summary"]["total_errors"] += error_metrics.get("total_errors", 0)
    
    # Store report
    report_id = f"error_report_{int(time.time())}"
    HyrexKV.set(
        f"report:{report_id}",
        json.dumps(report_data),
        expiry_seconds=2592000  # 30 days
    )
    
    # Send report to stakeholders if errors are above threshold
    if report_data["summary"]["total_errors"] > 10:
        send_error_report_email.send(report_id, report_data)
    
    return {
        "report_id": report_id,
        "total_errors": report_data["summary"]["total_errors"],
        "operations_analyzed": len(operations)
    }

# Helper functions (simplified implementations)
def perform_operation(operation: str, data: Dict) -> Any:
    """Perform the actual operation - placeholder"""
    if operation == "test_validation_error":
        raise ValidationError("Invalid input data", {"field": "email"})
    elif operation == "test_api_error":
        raise ExternalAPIError("API service unavailable", 503)
    return {"success": True, "data": data}

def update_task_metrics(operation: str, status: str):
    """Update task execution metrics"""
    pass

def record_task_success(operation: str, duration: float, task_context: Any):
    """Record successful task execution"""
    pass

def get_fallback_data(operation: str) -> Dict:
    """Get fallback data for recovery"""
    return {"fallback": True}

def execute_alternative_workflow(error_details: Dict) -> Dict:
    """Execute alternative workflow"""
    return {"alternative_executed": True}

Usage Examples

Basic Error Handling

# Process task with error handling
curl -X POST http://localhost:8000/tasks/monitored \
  -H "Content-Type: application/json" \
  -d '{
    "operation": "process_user_data",
    "data": {"user_id": 123, "email": "test@example.com"},
    "circuit_breaker_key": "user_service"
  }'

# Make resilient API call
curl -X POST http://localhost:8000/tasks/api-call \
  -H "Content-Type: application/json" \
  -d '{
    "url": "https://api.external.com/data",
    "method": "GET",
    "circuit_breaker_key": "external_api"
  }'

Error Recovery

# Trigger error recovery workflow  
curl -X POST http://localhost:8000/recovery/start \
  -H "Content-Type: application/json" \
  -d '{
    "error_id": "err_1234567890_5678",
    "recovery_strategy": "retry_with_fallback"
  }'

# Generate error report
curl -X POST http://localhost:8000/reports/errors \
  -H "Content-Type: application/json" \
  -d '{
    "operation": "process_payments", 
    "hours": 48
  }'

Circuit Breaker Configuration

# Configure circuit breaker thresholds
CIRCUIT_BREAKER_CONFIG = {
    "payment_service": {
        "threshold": 3,        # Open after 3 failures
        "timeout": 30,         # Stay open for 30 seconds
        "half_open_max": 1     # Allow 1 request in half-open state
    },
    "external_api": {
        "threshold": 5,
        "timeout": 60,
        "half_open_max": 2
    }
}

Monitoring Dashboard

@hy.task
def health_check_dashboard():
    """Generate system health dashboard data"""
    
    services = ["payment_service", "user_service", "external_api"]
    dashboard_data = {
        "timestamp": datetime.now().isoformat(),
        "services": {}
    }
    
    for service in services:
        # Check circuit breaker status
        cb_status = check_circuit_breaker(service)
        
        # Get error metrics
        error_metrics = get_recent_error_metrics(service, hours=1)
        
        dashboard_data["services"][service] = {
            "circuit_breaker_status": cb_status,
            "error_rate": error_metrics.get("error_rate", 0),
            "recent_errors": error_metrics.get("total_errors", 0),
            "availability": calculate_availability(service)
        }
    
    return dashboard_data

Production Considerations

  • Error classification: Properly categorize errors for appropriate handling
  • Circuit breaker tuning: Configure thresholds based on service characteristics
  • Alert fatigue: Implement intelligent alerting to avoid noise
  • Error aggregation: Use tools like Sentry or Rollbar for error tracking
  • Performance impact: Monitor overhead of error handling mechanisms
  • Recovery testing: Regularly test error recovery workflows

Next Steps

Human-in-the-Loop

Add human oversight to error recovery

Batch Processing

Handle errors in batch operations