Skip to main content

Documentation Index

Fetch the complete documentation index at: https://hyrex.io/docs/llms.txt

Use this file to discover all available pages before exploring further.

Create robust ETL (Extract, Transform, Load) pipelines with workflow orchestration, data validation, and error recovery.

Overview

This example demonstrates comprehensive ETL pipeline patterns:
  • Extract phase - Pull data from multiple sources (databases, APIs, files)
  • Transform phase - Clean, validate, and transform data
  • Load phase - Insert processed data into destination systems
  • Workflow orchestration - Chain tasks with dependency management
  • Error recovery - Handle failures with rollback and retry mechanisms
  • Scheduled execution - Run pipelines on cron schedules
Perfect for data warehousing, analytics, reporting, or any data integration needs.

Task Definitions

from hyrex import HyrexRegistry, HyrexKV, get_hyrex_context
from pydantic import BaseModel
from datetime import datetime, timedelta
import pandas as pd
import psycopg2
import requests
from typing import List, Dict, Any

hy = HyrexRegistry()

class ETLContext(BaseModel):
    pipeline_id: str
    source_table: str
    destination_table: str
    date: str
    batch_size: int = 1000

@hy.task(timeout_seconds=1800)  # 30 minutes
def extract_data(config: ETLContext):
    """Extract data from source database"""
    ctx = get_hyrex_context()
    
    # Connect to source database
    source_conn = psycopg2.connect(os.environ.get('SOURCE_DATABASE_URL'))
    
    try:
        query = f"""
        SELECT * FROM {config.source_table} 
        WHERE created_date = %s 
        ORDER BY id
        """
        
        # Use pandas for efficient data handling
        df = pd.read_sql_query(query, source_conn, params=[config.date])
        
        if df.empty:
            return {
                "status": "no_data",
                "message": f"No data found for date {config.date}",
                "row_count": 0
            }
        
        # Save to temporary parquet file
        temp_file = f"/tmp/extract_{config.pipeline_id}_{config.date}.parquet"
        df.to_parquet(temp_file, compression='snappy')
        
        # Store file path for next task
        HyrexKV.set(
            f"pipeline-{config.pipeline_id}-extract", 
            temp_file,
            expiry_seconds=7200  # 2 hours
        )
        
        # Store metadata
        metadata = {
            "source_table": config.source_table,
            "row_count": len(df),
            "columns": list(df.columns),
            "date_range": {
                "min": df['created_date'].min().isoformat() if 'created_date' in df else None,
                "max": df['created_date'].max().isoformat() if 'created_date' in df else None
            },
            "file_path": temp_file,
            "extracted_at": datetime.now().isoformat()
        }
        
        HyrexKV.set(
            f"pipeline-{config.pipeline_id}-metadata",
            json.dumps(metadata),
            expiry_seconds=7200
        )
        
        return {
            "status": "success",
            "row_count": len(df),
            "file_path": temp_file,
            "metadata": metadata
        }
        
    finally:
        source_conn.close()

@hy.task(timeout_seconds=2400)  # 40 minutes
def transform_data(config: ETLContext):
    """Transform extracted data with validation and cleaning"""
    ctx = get_hyrex_context()
    
    # Get file path from previous task
    temp_file = HyrexKV.get(f"pipeline-{config.pipeline_id}-extract")
    if not temp_file:
        raise Exception("Extract data not found - pipeline may have expired")
    
    # Load data
    df = pd.read_parquet(temp_file)
    original_count = len(df)
    
    # Data validation and cleaning
    transform_log = []
    
    # 1. Remove duplicates
    duplicate_count = df.duplicated().sum()
    if duplicate_count > 0:
        df = df.drop_duplicates()
        transform_log.append(f"Removed {duplicate_count} duplicate rows")
    
    # 2. Handle missing values
    null_counts = df.isnull().sum()
    for column, null_count in null_counts.items():
        if null_count > 0:
            if column in ['email', 'user_id']:  # Critical fields
                df = df.dropna(subset=[column])
                transform_log.append(f"Dropped {null_count} rows with null {column}")
            else:  # Non-critical fields
                df[column] = df[column].fillna('Unknown')
                transform_log.append(f"Filled {null_count} null values in {column}")
    
    # 3. Data type conversions
    if 'created_date' in df.columns:
        df['created_date'] = pd.to_datetime(df['created_date'])
    
    if 'amount' in df.columns:
        df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
        # Drop rows with invalid amounts
        invalid_amount_count = df['amount'].isnull().sum()
        if invalid_amount_count > 0:
            df = df.dropna(subset=['amount'])
            transform_log.append(f"Dropped {invalid_amount_count} rows with invalid amounts")
    
    # 4. Business logic transformations
    if 'status' in df.columns:
        # Standardize status values
        status_mapping = {
            'active': 'ACTIVE',
            'inactive': 'INACTIVE', 
            'pending': 'PENDING',
            'cancelled': 'CANCELLED'
        }
        df['status'] = df['status'].str.lower().map(status_mapping).fillna('UNKNOWN')
        transform_log.append("Standardized status values")
    
    # 5. Add derived columns
    df['processed_at'] = datetime.now()
    df['pipeline_id'] = config.pipeline_id
    df['data_quality_score'] = calculate_data_quality_score(df)
    
    # Save transformed data
    output_file = temp_file.replace("extract_", "transform_")
    df.to_parquet(output_file, compression='snappy')
    
    # Store for next task
    HyrexKV.set(
        f"pipeline-{config.pipeline_id}-transform", 
        output_file,
        expiry_seconds=7200
    )
    
    transform_summary = {
        "original_rows": original_count,
        "final_rows": len(df),
        "rows_removed": original_count - len(df),
        "transform_log": transform_log,
        "quality_metrics": {
            "completeness": (df.count().sum() / (len(df) * len(df.columns))) * 100,
            "avg_quality_score": df['data_quality_score'].mean()
        },
        "file_path": output_file,
        "transformed_at": datetime.now().isoformat()
    }
    
    return {
        "status": "success",
        "summary": transform_summary
    }

@hy.task(timeout_seconds=1800)  # 30 minutes  
def load_data(config: ETLContext):
    """Load transformed data to destination"""
    ctx = get_hyrex_context()
    
    # Get transformed data file
    output_file = HyrexKV.get(f"pipeline-{config.pipeline_id}-transform")
    if not output_file:
        raise Exception("Transform data not found")
    
    # Load transformed data
    df = pd.read_parquet(output_file)
    
    # Connect to destination database
    dest_conn = psycopg2.connect(os.environ.get('DESTINATION_DATABASE_URL'))
    
    try:
        cursor = dest_conn.cursor()
        
        # Create staging table
        staging_table = f"{config.destination_table}_staging_{config.pipeline_id}"
        create_staging_table_sql = f"""
        CREATE TABLE IF NOT EXISTS {staging_table} 
        AS SELECT * FROM {config.destination_table} WHERE 1=0
        """
        cursor.execute(create_staging_table_sql)
        
        # Insert data in batches
        total_rows = len(df)
        loaded_rows = 0
        
        for i in range(0, total_rows, config.batch_size):
            batch_df = df.iloc[i:i + config.batch_size]
            
            # Convert to records for insertion
            records = batch_df.to_dict('records')
            
            # Prepare bulk insert
            columns = list(batch_df.columns)
            placeholders = ', '.join(['%s'] * len(columns))
            columns_str = ', '.join(columns)
            
            insert_sql = f"""
            INSERT INTO {staging_table} ({columns_str}) 
            VALUES ({placeholders})
            """
            
            # Execute batch insert
            cursor.executemany(insert_sql, [tuple(record.values()) for record in records])
            loaded_rows += len(batch_df)
            
            # Update progress
            progress = (loaded_rows / total_rows) * 100
            print(f"Loading progress: {progress:.1f}% ({loaded_rows}/{total_rows})")
        
        # Data quality checks on staging table
        cursor.execute(f"SELECT COUNT(*) FROM {staging_table}")
        staging_count = cursor.fetchone()[0]
        
        if staging_count != total_rows:
            raise Exception(f"Row count mismatch: expected {total_rows}, got {staging_count}")
        
        # Swap staging table with main table (atomic operation)
        cursor.execute(f"BEGIN")
        cursor.execute(f"DROP TABLE IF EXISTS {config.destination_table}_old")
        cursor.execute(f"ALTER TABLE {config.destination_table} RENAME TO {config.destination_table}_old")
        cursor.execute(f"ALTER TABLE {staging_table} RENAME TO {config.destination_table}")
        cursor.execute(f"COMMIT")
        
        # Clean up old table
        cursor.execute(f"DROP TABLE {config.destination_table}_old")
        dest_conn.commit()
        
        return {
            "status": "success",
            "loaded_rows": loaded_rows,
            "destination_table": config.destination_table,
            "loaded_at": datetime.now().isoformat()
        }
        
    except Exception as e:
        dest_conn.rollback()
        # Clean up staging table on error
        try:
            cursor.execute(f"DROP TABLE IF EXISTS {staging_table}")
            dest_conn.commit()
        except:
            pass
        raise e
    
    finally:
        dest_conn.close()
        # Clean up temp files
        cleanup_temp_files(config.pipeline_id)

@hy.workflow(workflow_arg_schema=ETLContext)
def daily_etl_pipeline():
    """Complete ETL pipeline workflow"""
    # Define the DAG: Extract -> Transform -> Load
    extract_data >> transform_data >> load_data

@hy.task
def cleanup_temp_files(pipeline_id: str):
    """Clean up temporary files after pipeline completion"""
    import glob
    
    temp_files = glob.glob(f"/tmp/*{pipeline_id}*")
    removed_files = []
    
    for file_path in temp_files:
        try:
            os.remove(file_path)
            removed_files.append(file_path)
        except OSError:
            pass
    
    # Clean up KV store entries
    keys_to_remove = [
        f"pipeline-{pipeline_id}-extract",
        f"pipeline-{pipeline_id}-transform", 
        f"pipeline-{pipeline_id}-metadata"
    ]
    
    for key in keys_to_remove:
        try:
            HyrexKV.delete(key)
        except:
            pass
    
    return {"removed_files": removed_files, "cleaned_keys": keys_to_remove}

def calculate_data_quality_score(df: pd.DataFrame) -> pd.Series:
    """Calculate data quality score for each row"""
    scores = []
    
    for _, row in df.iterrows():
        score = 0
        total_checks = 0
        
        # Completeness check (non-null values)
        non_null_ratio = row.notna().sum() / len(row)
        score += non_null_ratio * 40  # 40% weight
        total_checks += 40
        
        # Validity checks (business rules)
        if 'email' in row and pd.notna(row['email']):
            if '@' in str(row['email']):
                score += 20
            total_checks += 20
        
        if 'amount' in row and pd.notna(row['amount']):
            if row['amount'] >= 0:
                score += 20
            total_checks += 20
        
        # Consistency checks
        if 'created_date' in row and pd.notna(row['created_date']):
            if row['created_date'] <= datetime.now():
                score += 20
            total_checks += 20
        
        scores.append(score if total_checks == 0 else (score / total_checks) * 100)
    
    return pd.Series(scores)

# Schedule daily ETL
@hy.cron("0 2 * * *")  # Run at 2 AM daily
def run_daily_etl():
    """Schedule daily ETL pipeline execution"""
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    pipeline_id = f"daily_{yesterday}_{int(datetime.now().timestamp())}"
    
    # Send workflow with arguments
    workflow_run = daily_etl_pipeline.send(ETLContext(
        pipeline_id=pipeline_id,
        source_table="raw_events",
        destination_table="processed_events", 
        date=yesterday
    ))
    
    return {
        "pipeline_id": pipeline_id,
        "workflow_run_id": workflow_run.workflow_run_id,
        "date": yesterday,
        "started_at": datetime.now().isoformat()
    }

Usage Examples

Manual Pipeline Execution

# Trigger ETL pipeline for specific date
curl -X POST http://localhost:8000/etl/run \
  -H "Content-Type: application/json" \
  -d '{
    "pipeline_id": "manual_2024_01_15",
    "source_table": "raw_transactions", 
    "destination_table": "clean_transactions",
    "date": "2024-01-15",
    "batch_size": 500
  }'

# Check pipeline status
curl http://localhost:8000/etl/status/manual_2024_01_15

Pipeline Monitoring

@hy.task
def monitor_pipeline_health():
    """Monitor ETL pipeline performance"""
    
    # Check recent pipeline runs
    recent_runs = get_recent_pipeline_runs(days=7)
    
    metrics = {
        "total_runs": len(recent_runs),
        "success_rate": sum(1 for run in recent_runs if run['status'] == 'success') / len(recent_runs) * 100,
        "avg_duration": sum(run['duration'] for run in recent_runs) / len(recent_runs),
        "data_volume": sum(run.get('row_count', 0) for run in recent_runs)
    }
    
    # Alert if success rate below threshold
    if metrics['success_rate'] < 95:
        send_alert("ETL pipeline success rate below 95%", metrics)
    
    return metrics

Production Considerations

  • Data lineage: Track data flow and transformations for compliance
  • Incremental processing: Process only changed/new data for efficiency
  • Schema evolution: Handle source schema changes gracefully
  • Data quality monitoring: Implement comprehensive data validation
  • Resource management: Monitor memory and CPU usage for large datasets
  • Backup and recovery: Implement rollback mechanisms for failed loads

Next Steps

Batch Processing

Handle large dataset processing

Error Handling

Implement robust error recovery