Documentation Index Fetch the complete documentation index at: https://hyrex.io/docs/llms.txt
Use this file to discover all available pages before exploring further.
Essential best practices for running Hyrex reliably in production environments.
Production Checklist
Post-Deployment
Security Best Practices
API Key Management
# Use environment variables for secrets
import os
# Good: Environment variables
api_key = os.getenv( "HYREX_API_KEY" )
# Bad: Hardcoded secrets
api_key = "prod_hx_12345" # Never do this
Task Security
# Validate input data
from pydantic import BaseModel, validator
class SecureTaskContext ( BaseModel ):
user_id: int
file_path: str
@validator ( 'file_path' )
def validate_file_path ( cls , v ):
# Prevent directory traversal
if '..' in v or v.startswith( '/' ):
raise ValueError ( 'Invalid file path' )
return v
@hy.task
def secure_file_processor ( ctx : SecureTaskContext):
# Always validate and sanitize inputs
if not ctx.user_id or ctx.user_id <= 0 :
raise ValueError ( "Invalid user ID" )
# Your secure processing logic
pass
Error Handling
Structured Error Handling
import logging
from enum import Enum
class TaskError ( Exception ):
"""Custom exception for task failures"""
def __init__ ( self , message : str , error_code : str , retryable : bool = True ):
self .message = message
self .error_code = error_code
self .retryable = retryable
super (). __init__ (message)
@hy.task ( max_retries = 3 )
def robust_task ( ctx ):
try :
# Your task logic
return process_data(ctx.data)
except ConnectionError as e:
# Retryable error
logging.warning( f "Connection failed, will retry: { e } " )
raise TaskError( f "Connection failed: { e } " , "CONNECTION_ERROR" , retryable = True )
except ValidationError as e:
# Non-retryable error
logging.error( f "Invalid data, won't retry: { e } " )
raise TaskError( f "Invalid data: { e } " , "VALIDATION_ERROR" , retryable = False )
except Exception as e:
# Unknown error
logging.error( f "Unexpected error: { e } " , exc_info = True )
raise TaskError( f "Unexpected error: { e } " , "UNKNOWN_ERROR" )
Dead Letter Queues
@hy.task (
queue = "main-processing" ,
max_retries = 3 ,
on_failure = "handle_failed_task"
)
def main_task ( ctx ):
# Main processing logic
pass
@hy.task ( queue = "dead-letter" )
def handle_failed_task ( ctx , error_info ):
"""Handle tasks that failed all retries"""
logging.error( f "Task failed permanently: { error_info } " )
# Store for manual review
store_failed_task({
"task_id" : ctx.task_id,
"error" : error_info,
"timestamp" : time.time(),
"data" : ctx.data
})
# Notify administrators
send_alert( f "Task { ctx.task_id } failed permanently" )
Memory Management
import gc
from contextlib import contextmanager
@contextmanager
def memory_cleanup ():
"""Context manager for memory cleanup"""
try :
yield
finally :
gc.collect()
@hy.task
def memory_efficient_task ( ctx ):
"""Process large datasets efficiently"""
with memory_cleanup():
# Process data in chunks
for chunk in chunked_data(ctx.large_dataset, chunk_size = 1000 ):
with memory_cleanup():
process_chunk(chunk)
Database Connections
from contextlib import contextmanager
import psycopg2.pool
# Connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool( 1 , 20 , DATABASE_URL )
@contextmanager
def get_db_connection ():
conn = connection_pool.getconn()
try :
yield conn
conn.commit()
except Exception :
conn.rollback()
raise
finally :
connection_pool.putconn(conn)
@hy.task
def database_task ( ctx ):
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute( "SELECT * FROM users WHERE id = %s " , (ctx.user_id,))
return cursor.fetchone()
Troubleshooting Guide
Common Issues
High Memory Usage
# Monitor memory usage
import psutil
@hy.task
def memory_monitored_task ( ctx ):
memory_before = psutil.virtual_memory().percent
# Your task logic
result = process_data(ctx)
memory_after = psutil.virtual_memory().percent
memory_diff = memory_after - memory_before
if memory_diff > 10 : # 10% increase
logging.warning( f "High memory usage increase: { memory_diff } %" )
return result
Slow Tasks
import time
from functools import wraps
def timing_decorator ( func ):
@wraps (func)
def wrapper ( * args , ** kwargs ):
start_time = time.time()
result = func( * args, ** kwargs)
duration = time.time() - start_time
logging.info( f " { func. __name__ } took { duration :.2f} seconds" )
if duration > 60 : # Warn if task takes over 1 minute
logging.warning( f "Slow task detected: { func. __name__ } took { duration :.2f} s" )
return result
return wrapper
@hy.task
@timing_decorator
def timed_task ( ctx ):
# Your task logic
pass
Queue Backlog
@hy.schedule ( "*/5 * * * *" ) # Every 5 minutes
def monitor_queue_health ():
"""Monitor queue depths and alert on backlogs"""
queue_depths = {
"critical" : hy.get_queue_depth( "critical" ),
"normal" : hy.get_queue_depth( "normal" ),
"batch" : hy.get_queue_depth( "batch" )
}
alerts = []
for queue, depth in queue_depths.items():
if depth > 1000 :
alerts.append( f "High backlog in { queue } : { depth } tasks" )
if alerts:
send_alert( "Queue Backlog Alert" , " \n " .join(alerts))
return queue_depths
Maintenance Tasks
Regular Cleanup
@hy.schedule ( "0 2 * * 0" ) # Weekly at 2 AM Sunday
def weekly_cleanup ():
"""Perform weekly maintenance tasks"""
# Clean up old task results
cleanup_old_results( older_than_days = 30 )
# Archive completed tasks
archive_completed_tasks( older_than_days = 7 )
# Generate performance report
generate_weekly_report()
return { "cleanup_completed" : True }
@hy.schedule ( "0 0 1 * *" ) # Monthly on 1st at midnight
def monthly_maintenance ():
"""Monthly maintenance and optimization"""
# Database maintenance
optimize_database_indexes()
# Update metrics
generate_monthly_metrics()
# Capacity planning
analyze_capacity_trends()
return { "maintenance_completed" : True }
Health Checks
@hy.task
def health_check ():
"""Comprehensive health check"""
checks = {
"database" : check_database_connection(),
"redis" : check_redis_connection(),
"external_apis" : check_external_apis(),
"disk_space" : check_disk_space(),
"memory_usage" : check_memory_usage()
}
healthy = all (checks.values())
if not healthy:
failed_checks = [k for k, v in checks.items() if not v]
send_alert( f "Health check failed: { ', ' .join(failed_checks) } " )
return {
"healthy" : healthy,
"checks" : checks,
"timestamp" : time.time()
}
Support Resources
Getting Help
Create an incident response plan with:
On-call engineer contacts
Escalation procedures
Service dependencies
Recovery procedures
Next Steps
Deployment Environment setup and deployment
Monitoring & Scaling Set up monitoring and scaling strategies