Learn from practical examples that you can copy and adapt for your use cases.

Background Job Processing

Process tasks asynchronously without blocking your main application.
from hyrex import HyrexRegistry
from pydantic import BaseModel
import requests

hy = HyrexRegistry()

class ImageProcessingContext(BaseModel):
    image_url: str
    user_id: int
    sizes: list[tuple[int, int]] = [(100, 100), (300, 300), (800, 800)]

@hy.task(
    queue="image-processing",
    max_retries=3,
    timeout_seconds=300
)
def process_uploaded_image(context: ImageProcessingContext):
    """Generate multiple sizes of an uploaded image"""
    
    # Download original image
    response = requests.get(context.image_url)
    image_data = response.content
    
    processed_images = []
    for width, height in context.sizes:
        # Your image processing logic here
        # Example: resize, optimize, upload to S3
        processed_url = f"https://cdn.example.com/{context.user_id}/{width}x{height}.jpg"
        processed_images.append(processed_url)
    
    # Notify user or update database
    return {
        "original": context.image_url,
        "processed": processed_images,
        "user_id": context.user_id
    }

# In your web application
@app.post("/upload")
def handle_upload(image: UploadFile, user_id: int):
    # Save uploaded file
    image_url = save_to_storage(image)
    
    # Queue processing task
    task = process_uploaded_image.send(
        ImageProcessingContext(
            image_url=image_url,
            user_id=user_id
        )
    )
    
    return {"message": "Upload successful", "task_id": task.id}

Email Notifications

Reliable email delivery with retry handling.
from datetime import datetime

class EmailContext(BaseModel):
    to: str
    subject: str
    template: str
    context: dict

@hy.task(
    max_retries=5,
    retry_backoff=True  # Built-in exponential backoff
)
def send_email(context: EmailContext):
    """Send transactional emails with automatic retries"""
    
    # Render template
    html_body = render_template(context.template, context.context)
    
    # Send via provider
    response = email_provider.send(
        to=context.to,
        subject=context.subject,
        html=html_body
    )
    
    if response.status_code >= 500:
        # Retry on server errors
        raise Exception(f"Email provider error: {response.status_code}")
    
    return {
        "message_id": response.message_id,
        "sent_at": datetime.now().isoformat()
    }

# Send welcome email after user signup
@app.post("/signup")
def signup(user: UserCreate):
    user_id = create_user(user)
    
    # Queue welcome email
    task = send_email.send(EmailContext(
        to=user.email,
        subject="Welcome to Our App!",
        template="welcome.html",
        context={"name": user.name, "user_id": user_id}
    ))
    
    return {"user_id": user_id, "email_task_id": str(task.id)}

ETL Pipeline

Build data pipelines with workflow orchestration.
from datetime import datetime, timedelta
from hyrex import HyrexKV, get_hyrex_context

class ETLContext(BaseModel):
    source_table: str
    destination_table: str
    date: str

@hy.task
def extract_data(config: ETLContext):
    """Extract data from source"""
    ctx = get_hyrex_context()
    
    query = f"SELECT * FROM {config.source_table} WHERE date = '{config.date}'"
    data = database.execute(query)
    
    # Save to temporary storage
    file_path = f"/tmp/extract_{config.date}.parquet"
    save_parquet(data, file_path)
    
    # Store path for next task
    HyrexKV.set(f"workflow-{ctx.workflow_run_id}-extract", file_path)
    
    return {"file_path": file_path, "row_count": len(data)}

@hy.task
def transform_data(config: ETLContext):
    """Transform extracted data"""
    ctx = get_hyrex_context()
    
    # Get file path from previous task
    file_path = HyrexKV.get(f"workflow-{ctx.workflow_run_id}-extract")
    
    # Load and transform
    df = load_parquet(file_path)
    df = apply_transformations(df)
    
    # Save transformed data
    output_path = file_path.replace("extract_", "transform_")
    save_parquet(df, output_path)
    
    # Store for next task
    HyrexKV.set(f"workflow-{ctx.workflow_run_id}-transform", output_path)
    
    return {"file_path": output_path, "row_count": len(df)}

@hy.task
def load_data(config: ETLContext):
    """Load data to destination"""
    ctx = get_hyrex_context()
    
    # Get file path from previous task
    file_path = HyrexKV.get(f"workflow-{ctx.workflow_run_id}-transform")
    
    # Load to destination
    df = load_parquet(file_path)
    database.bulk_insert(config.destination_table, df)
    
    # Cleanup temp files
    cleanup_files([file_path])
    
    return {"loaded_rows": len(df)}

@hy.workflow(workflow_arg_schema=ETLContext)
def daily_etl_pipeline():
    """Complete ETL pipeline workflow"""
    # Define the DAG
    extract_data >> transform_data >> load_data

# Schedule daily ETL
@hy.task(cron="0 2 * * *")  # Run at 2 AM daily
def run_daily_etl():
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    
    # Send workflow with arguments
    daily_etl_pipeline.send(ETLContext(
        source_table="raw_events",
        destination_table="processed_events",
        date=yesterday
    ))

API Rate Limiting

Handle rate-limited APIs gracefully.
from hyrex import HyrexKV
import time
import json

class APICallContext(BaseModel):
    endpoint: str
    params: dict
    api_name: str = "external_api"

@hy.task(
    max_retries=10,
    retry_backoff=True  # Built-in exponential backoff
)
def call_rate_limited_api(context: APICallContext):
    """Call external API with rate limit handling"""
    
    # Track rate limit using KV store
    rate_key = f"rate_limit:{context.api_name}"
    
    try:
        rate_data = HyrexKV.get(rate_key)
        rate_info = json.loads(rate_data)
        calls_made = rate_info.get("calls", 0)
        reset_time = rate_info.get("reset_time", 0)
    except:
        calls_made = 0
        reset_time = 0
    
    # Reset counter if hour has passed
    if time.time() > reset_time:
        calls_made = 0
        reset_time = time.time() + 3600  # Reset in 1 hour
    
    if calls_made >= 100:  # 100 calls per hour limit
        # Wait or retry later
        raise Exception("Rate limit exceeded, will retry")
    
    # Make API call
    response = requests.get(
        f"https://api.example.com/{context.endpoint}",
        params=context.params
    )
    
    # Update rate limit counter
    calls_made += 1
    HyrexKV.set(rate_key, json.dumps({
        "calls": calls_made,
        "reset_time": reset_time
    }))
    
    if response.status_code == 429:
        # Rate limited by API
        retry_after = int(response.headers.get("Retry-After", 60))
        raise Exception(f"Rate limited, retry after {retry_after}s")
    
    return response.json()

Batch Processing

Process large datasets in manageable chunks.
import json

class BatchContext(BaseModel):
    batch_id: int
    total_batches: int
    items: list[dict]

@hy.task(queue="batch-processing")
def process_batch(context: BatchContext):
    """Process a batch of items"""
    results = []
    
    for item in context.items:
        # Process each item
        result = process_item(item)
        results.append(result)
    
    # Store results in KV store
    HyrexKV.set(
        f"batch_results:{context.batch_id}",
        json.dumps({
            "results": results,
            "processed_at": datetime.now().isoformat()
        })
    )
    
    return {
        "batch_id": context.batch_id,
        "processed": len(results),
        "success": True
    }

def process_large_dataset(items: list[dict], batch_size: int = 100):
    """Split large dataset into batches"""
    
    batches = []
    total_batches = (len(items) + batch_size - 1) // batch_size
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        
        # Send each batch as a separate task
        task = process_batch.send(BatchContext(
            batch_id=i // batch_size,
            total_batches=total_batches,
            items=batch
        ))
        batches.append(task)
    
    return batches

# Helper to retrieve all batch results
def get_batch_results(total_batches: int) -> list:
    """Retrieve and combine results from all batches"""
    all_results = []
    
    for batch_id in range(total_batches):
        try:
            batch_data = HyrexKV.get(f"batch_results:{batch_id}")
            batch_info = json.loads(batch_data)
            all_results.extend(batch_info["results"])
        except:
            pass  # Batch not yet processed
    
    return all_results

Error Handling and Monitoring

Implement comprehensive error handling and monitoring.
import traceback
import logging
from hyrex import get_hyrex_context

logger = logging.getLogger(__name__)

class MonitoredTaskContext(BaseModel):
    operation: str
    data: dict

@hy.task(max_retries=3)
def monitored_task(context: MonitoredTaskContext):
    """Task with built-in monitoring and error handling"""
    
    start_time = time.time()
    task_context = get_hyrex_context()
    
    try:
        # Track task execution count
        try:
            count_data = HyrexKV.get(f"task_count:{context.operation}")
            count = json.loads(count_data)["count"]
        except:
            count = 0
            
        HyrexKV.set(
            f"task_count:{context.operation}",
            json.dumps({"count": count + 1, "last_run": datetime.now().isoformat()})
        )
        
        # Perform operation
        result = perform_operation(context.operation, context.data)
        
        # Record success metrics
        duration = time.time() - start_time
        HyrexKV.set(
            f"task_metrics:{context.operation}:{task_context.task_id if task_context else 'unknown'}",
            json.dumps({
                "duration": duration,
                "status": "success",
                "timestamp": datetime.now().isoformat()
            })
        )
        
        return {
            "success": True,
            "result": result,
            "duration": duration
        }
        
    except ValidationError as e:
        # Don't retry validation errors
        return {
            "success": False,
            "error": "validation_error",
            "details": str(e)
        }
        
    except ExternalAPIError as e:
        # Retry external errors
        if e.status_code >= 500:
            raise  # Will trigger retry
        else:
            return {
                "success": False,
                "error": "api_error",
                "details": str(e)
            }
            
    except Exception as e:
        # Log unexpected errors
        logger.error(f"Task failed: {context.operation}", exc_info=True)
        
        # Store error for debugging
        error_key = f"task_error:{context.operation}:{time.time()}"
        HyrexKV.set(
            error_key,
            json.dumps({
                "error": str(e),
                "error_type": type(e).__name__,
                "context": context.dict(),
                "traceback": traceback.format_exc(),
                "attempt": task_context.attempt_number if task_context else 0,
                "timestamp": datetime.now().isoformat()
            })
        )
        
        raise  # Will trigger retry

Next Steps