Essential best practices for running Hyrex reliably in production environments.

Production Checklist

Post-Deployment

  • Configure monitoring dashboards
  • Set up alerts for key metrics
  • Create runbooks for common issues
  • Schedule regular performance reviews
  • Plan for capacity scaling
  • Document incident response procedures
  • Set up log rotation and retention
  • Configure backup strategies

Security Best Practices

API Key Management

# Use environment variables for secrets
import os

# Good: Environment variables
api_key = os.getenv("HYREX_API_KEY")

# Bad: Hardcoded secrets
api_key = "prod_hx_12345"  # Never do this

Task Security

# Validate input data
from pydantic import BaseModel, validator

class SecureTaskContext(BaseModel):
    user_id: int
    file_path: str
    
    @validator('file_path')
    def validate_file_path(cls, v):
        # Prevent directory traversal
        if '..' in v or v.startswith('/'):
            raise ValueError('Invalid file path')
        return v

@hy.task
def secure_file_processor(ctx: SecureTaskContext):
    # Always validate and sanitize inputs
    if not ctx.user_id or ctx.user_id <= 0:
        raise ValueError("Invalid user ID")
    
    # Your secure processing logic
    pass

Error Handling

Structured Error Handling

import logging
from enum import Enum

class TaskError(Exception):
    """Custom exception for task failures"""
    def __init__(self, message: str, error_code: str, retryable: bool = True):
        self.message = message
        self.error_code = error_code
        self.retryable = retryable
        super().__init__(message)

@hy.task(max_retries=3)
def robust_task(ctx):
    try:
        # Your task logic
        return process_data(ctx.data)
    
    except ConnectionError as e:
        # Retryable error
        logging.warning(f"Connection failed, will retry: {e}")
        raise TaskError(f"Connection failed: {e}", "CONNECTION_ERROR", retryable=True)
    
    except ValidationError as e:
        # Non-retryable error
        logging.error(f"Invalid data, won't retry: {e}")
        raise TaskError(f"Invalid data: {e}", "VALIDATION_ERROR", retryable=False)
    
    except Exception as e:
        # Unknown error
        logging.error(f"Unexpected error: {e}", exc_info=True)
        raise TaskError(f"Unexpected error: {e}", "UNKNOWN_ERROR")

Dead Letter Queues

@hy.task(
    queue="main-processing",
    max_retries=3,
    on_failure="handle_failed_task"
)
def main_task(ctx):
    # Main processing logic
    pass

@hy.task(queue="dead-letter")
def handle_failed_task(ctx, error_info):
    """Handle tasks that failed all retries"""
    
    logging.error(f"Task failed permanently: {error_info}")
    
    # Store for manual review
    store_failed_task({
        "task_id": ctx.task_id,
        "error": error_info,
        "timestamp": time.time(),
        "data": ctx.data
    })
    
    # Notify administrators
    send_alert(f"Task {ctx.task_id} failed permanently")

Performance Best Practices

Memory Management

import gc
from contextlib import contextmanager

@contextmanager
def memory_cleanup():
    """Context manager for memory cleanup"""
    try:
        yield
    finally:
        gc.collect()

@hy.task
def memory_efficient_task(ctx):
    """Process large datasets efficiently"""
    
    with memory_cleanup():
        # Process data in chunks
        for chunk in chunked_data(ctx.large_dataset, chunk_size=1000):
            with memory_cleanup():
                process_chunk(chunk)

Database Connections

from contextlib import contextmanager
import psycopg2.pool

# Connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(1, 20, DATABASE_URL)

@contextmanager
def get_db_connection():
    conn = connection_pool.getconn()
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        connection_pool.putconn(conn)

@hy.task
def database_task(ctx):
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM users WHERE id = %s", (ctx.user_id,))
        return cursor.fetchone()

Troubleshooting Guide

Common Issues

High Memory Usage

# Monitor memory usage
import psutil

@hy.task
def memory_monitored_task(ctx):
    memory_before = psutil.virtual_memory().percent
    
    # Your task logic
    result = process_data(ctx)
    
    memory_after = psutil.virtual_memory().percent
    memory_diff = memory_after - memory_before
    
    if memory_diff > 10:  # 10% increase
        logging.warning(f"High memory usage increase: {memory_diff}%")
    
    return result

Slow Tasks

import time
from functools import wraps

def timing_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start_time
        
        logging.info(f"{func.__name__} took {duration:.2f} seconds")
        
        if duration > 60:  # Warn if task takes over 1 minute
            logging.warning(f"Slow task detected: {func.__name__} took {duration:.2f}s")
        
        return result
    return wrapper

@hy.task
@timing_decorator
def timed_task(ctx):
    # Your task logic
    pass

Queue Backlog

@hy.schedule("*/5 * * * *")  # Every 5 minutes
def monitor_queue_health():
    """Monitor queue depths and alert on backlogs"""
    
    queue_depths = {
        "critical": hy.get_queue_depth("critical"),
        "normal": hy.get_queue_depth("normal"),
        "batch": hy.get_queue_depth("batch")
    }
    
    alerts = []
    for queue, depth in queue_depths.items():
        if depth > 1000:
            alerts.append(f"High backlog in {queue}: {depth} tasks")
    
    if alerts:
        send_alert("Queue Backlog Alert", "\n".join(alerts))
    
    return queue_depths

Maintenance Tasks

Regular Cleanup

@hy.schedule("0 2 * * 0")  # Weekly at 2 AM Sunday
def weekly_cleanup():
    """Perform weekly maintenance tasks"""
    
    # Clean up old task results
    cleanup_old_results(older_than_days=30)
    
    # Archive completed tasks
    archive_completed_tasks(older_than_days=7)
    
    # Generate performance report
    generate_weekly_report()
    
    return {"cleanup_completed": True}

@hy.schedule("0 0 1 * *")  # Monthly on 1st at midnight
def monthly_maintenance():
    """Monthly maintenance and optimization"""
    
    # Database maintenance
    optimize_database_indexes()
    
    # Update metrics
    generate_monthly_metrics()
    
    # Capacity planning
    analyze_capacity_trends()
    
    return {"maintenance_completed": True}

Health Checks

@hy.task
def health_check():
    """Comprehensive health check"""
    
    checks = {
        "database": check_database_connection(),
        "redis": check_redis_connection(),
        "external_apis": check_external_apis(),
        "disk_space": check_disk_space(),
        "memory_usage": check_memory_usage()
    }
    
    healthy = all(checks.values())
    
    if not healthy:
        failed_checks = [k for k, v in checks.items() if not v]
        send_alert(f"Health check failed: {', '.join(failed_checks)}")
    
    return {
        "healthy": healthy,
        "checks": checks,
        "timestamp": time.time()
    }

Support Resources

Getting Help

Emergency Contacts

Create an incident response plan with:
  • On-call engineer contacts
  • Escalation procedures
  • Service dependencies
  • Recovery procedures

Next Steps