Create robust ETL (Extract, Transform, Load) pipelines with workflow orchestration, data validation, and error recovery.

Overview

This example demonstrates comprehensive ETL pipeline patterns:
  • Extract phase - Pull data from multiple sources (databases, APIs, files)
  • Transform phase - Clean, validate, and transform data
  • Load phase - Insert processed data into destination systems
  • Workflow orchestration - Chain tasks with dependency management
  • Error recovery - Handle failures with rollback and retry mechanisms
  • Scheduled execution - Run pipelines on cron schedules
Perfect for data warehousing, analytics, reporting, or any data integration needs.

Task Definitions

from hyrex import HyrexRegistry, HyrexKV, get_hyrex_context
from pydantic import BaseModel
from datetime import datetime, timedelta
import pandas as pd
import psycopg2
import requests
from typing import List, Dict, Any

hy = HyrexRegistry()

class ETLContext(BaseModel):
    pipeline_id: str
    source_table: str
    destination_table: str
    date: str
    batch_size: int = 1000

@hy.task(timeout_seconds=1800)  # 30 minutes
def extract_data(config: ETLContext):
    """Extract data from source database"""
    ctx = get_hyrex_context()
    
    # Connect to source database
    source_conn = psycopg2.connect(os.environ.get('SOURCE_DATABASE_URL'))
    
    try:
        query = f"""
        SELECT * FROM {config.source_table} 
        WHERE created_date = %s 
        ORDER BY id
        """
        
        # Use pandas for efficient data handling
        df = pd.read_sql_query(query, source_conn, params=[config.date])
        
        if df.empty:
            return {
                "status": "no_data",
                "message": f"No data found for date {config.date}",
                "row_count": 0
            }
        
        # Save to temporary parquet file
        temp_file = f"/tmp/extract_{config.pipeline_id}_{config.date}.parquet"
        df.to_parquet(temp_file, compression='snappy')
        
        # Store file path for next task
        HyrexKV.set(
            f"pipeline-{config.pipeline_id}-extract", 
            temp_file,
            expiry_seconds=7200  # 2 hours
        )
        
        # Store metadata
        metadata = {
            "source_table": config.source_table,
            "row_count": len(df),
            "columns": list(df.columns),
            "date_range": {
                "min": df['created_date'].min().isoformat() if 'created_date' in df else None,
                "max": df['created_date'].max().isoformat() if 'created_date' in df else None
            },
            "file_path": temp_file,
            "extracted_at": datetime.now().isoformat()
        }
        
        HyrexKV.set(
            f"pipeline-{config.pipeline_id}-metadata",
            json.dumps(metadata),
            expiry_seconds=7200
        )
        
        return {
            "status": "success",
            "row_count": len(df),
            "file_path": temp_file,
            "metadata": metadata
        }
        
    finally:
        source_conn.close()

@hy.task(timeout_seconds=2400)  # 40 minutes
def transform_data(config: ETLContext):
    """Transform extracted data with validation and cleaning"""
    ctx = get_hyrex_context()
    
    # Get file path from previous task
    temp_file = HyrexKV.get(f"pipeline-{config.pipeline_id}-extract")
    if not temp_file:
        raise Exception("Extract data not found - pipeline may have expired")
    
    # Load data
    df = pd.read_parquet(temp_file)
    original_count = len(df)
    
    # Data validation and cleaning
    transform_log = []
    
    # 1. Remove duplicates
    duplicate_count = df.duplicated().sum()
    if duplicate_count > 0:
        df = df.drop_duplicates()
        transform_log.append(f"Removed {duplicate_count} duplicate rows")
    
    # 2. Handle missing values
    null_counts = df.isnull().sum()
    for column, null_count in null_counts.items():
        if null_count > 0:
            if column in ['email', 'user_id']:  # Critical fields
                df = df.dropna(subset=[column])
                transform_log.append(f"Dropped {null_count} rows with null {column}")
            else:  # Non-critical fields
                df[column] = df[column].fillna('Unknown')
                transform_log.append(f"Filled {null_count} null values in {column}")
    
    # 3. Data type conversions
    if 'created_date' in df.columns:
        df['created_date'] = pd.to_datetime(df['created_date'])
    
    if 'amount' in df.columns:
        df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
        # Drop rows with invalid amounts
        invalid_amount_count = df['amount'].isnull().sum()
        if invalid_amount_count > 0:
            df = df.dropna(subset=['amount'])
            transform_log.append(f"Dropped {invalid_amount_count} rows with invalid amounts")
    
    # 4. Business logic transformations
    if 'status' in df.columns:
        # Standardize status values
        status_mapping = {
            'active': 'ACTIVE',
            'inactive': 'INACTIVE', 
            'pending': 'PENDING',
            'cancelled': 'CANCELLED'
        }
        df['status'] = df['status'].str.lower().map(status_mapping).fillna('UNKNOWN')
        transform_log.append("Standardized status values")
    
    # 5. Add derived columns
    df['processed_at'] = datetime.now()
    df['pipeline_id'] = config.pipeline_id
    df['data_quality_score'] = calculate_data_quality_score(df)
    
    # Save transformed data
    output_file = temp_file.replace("extract_", "transform_")
    df.to_parquet(output_file, compression='snappy')
    
    # Store for next task
    HyrexKV.set(
        f"pipeline-{config.pipeline_id}-transform", 
        output_file,
        expiry_seconds=7200
    )
    
    transform_summary = {
        "original_rows": original_count,
        "final_rows": len(df),
        "rows_removed": original_count - len(df),
        "transform_log": transform_log,
        "quality_metrics": {
            "completeness": (df.count().sum() / (len(df) * len(df.columns))) * 100,
            "avg_quality_score": df['data_quality_score'].mean()
        },
        "file_path": output_file,
        "transformed_at": datetime.now().isoformat()
    }
    
    return {
        "status": "success",
        "summary": transform_summary
    }

@hy.task(timeout_seconds=1800)  # 30 minutes  
def load_data(config: ETLContext):
    """Load transformed data to destination"""
    ctx = get_hyrex_context()
    
    # Get transformed data file
    output_file = HyrexKV.get(f"pipeline-{config.pipeline_id}-transform")
    if not output_file:
        raise Exception("Transform data not found")
    
    # Load transformed data
    df = pd.read_parquet(output_file)
    
    # Connect to destination database
    dest_conn = psycopg2.connect(os.environ.get('DESTINATION_DATABASE_URL'))
    
    try:
        cursor = dest_conn.cursor()
        
        # Create staging table
        staging_table = f"{config.destination_table}_staging_{config.pipeline_id}"
        create_staging_table_sql = f"""
        CREATE TABLE IF NOT EXISTS {staging_table} 
        AS SELECT * FROM {config.destination_table} WHERE 1=0
        """
        cursor.execute(create_staging_table_sql)
        
        # Insert data in batches
        total_rows = len(df)
        loaded_rows = 0
        
        for i in range(0, total_rows, config.batch_size):
            batch_df = df.iloc[i:i + config.batch_size]
            
            # Convert to records for insertion
            records = batch_df.to_dict('records')
            
            # Prepare bulk insert
            columns = list(batch_df.columns)
            placeholders = ', '.join(['%s'] * len(columns))
            columns_str = ', '.join(columns)
            
            insert_sql = f"""
            INSERT INTO {staging_table} ({columns_str}) 
            VALUES ({placeholders})
            """
            
            # Execute batch insert
            cursor.executemany(insert_sql, [tuple(record.values()) for record in records])
            loaded_rows += len(batch_df)
            
            # Update progress
            progress = (loaded_rows / total_rows) * 100
            print(f"Loading progress: {progress:.1f}% ({loaded_rows}/{total_rows})")
        
        # Data quality checks on staging table
        cursor.execute(f"SELECT COUNT(*) FROM {staging_table}")
        staging_count = cursor.fetchone()[0]
        
        if staging_count != total_rows:
            raise Exception(f"Row count mismatch: expected {total_rows}, got {staging_count}")
        
        # Swap staging table with main table (atomic operation)
        cursor.execute(f"BEGIN")
        cursor.execute(f"DROP TABLE IF EXISTS {config.destination_table}_old")
        cursor.execute(f"ALTER TABLE {config.destination_table} RENAME TO {config.destination_table}_old")
        cursor.execute(f"ALTER TABLE {staging_table} RENAME TO {config.destination_table}")
        cursor.execute(f"COMMIT")
        
        # Clean up old table
        cursor.execute(f"DROP TABLE {config.destination_table}_old")
        dest_conn.commit()
        
        return {
            "status": "success",
            "loaded_rows": loaded_rows,
            "destination_table": config.destination_table,
            "loaded_at": datetime.now().isoformat()
        }
        
    except Exception as e:
        dest_conn.rollback()
        # Clean up staging table on error
        try:
            cursor.execute(f"DROP TABLE IF EXISTS {staging_table}")
            dest_conn.commit()
        except:
            pass
        raise e
    
    finally:
        dest_conn.close()
        # Clean up temp files
        cleanup_temp_files(config.pipeline_id)

@hy.workflow(workflow_arg_schema=ETLContext)
def daily_etl_pipeline():
    """Complete ETL pipeline workflow"""
    # Define the DAG: Extract -> Transform -> Load
    extract_data >> transform_data >> load_data

@hy.task
def cleanup_temp_files(pipeline_id: str):
    """Clean up temporary files after pipeline completion"""
    import glob
    
    temp_files = glob.glob(f"/tmp/*{pipeline_id}*")
    removed_files = []
    
    for file_path in temp_files:
        try:
            os.remove(file_path)
            removed_files.append(file_path)
        except OSError:
            pass
    
    # Clean up KV store entries
    keys_to_remove = [
        f"pipeline-{pipeline_id}-extract",
        f"pipeline-{pipeline_id}-transform", 
        f"pipeline-{pipeline_id}-metadata"
    ]
    
    for key in keys_to_remove:
        try:
            HyrexKV.delete(key)
        except:
            pass
    
    return {"removed_files": removed_files, "cleaned_keys": keys_to_remove}

def calculate_data_quality_score(df: pd.DataFrame) -> pd.Series:
    """Calculate data quality score for each row"""
    scores = []
    
    for _, row in df.iterrows():
        score = 0
        total_checks = 0
        
        # Completeness check (non-null values)
        non_null_ratio = row.notna().sum() / len(row)
        score += non_null_ratio * 40  # 40% weight
        total_checks += 40
        
        # Validity checks (business rules)
        if 'email' in row and pd.notna(row['email']):
            if '@' in str(row['email']):
                score += 20
            total_checks += 20
        
        if 'amount' in row and pd.notna(row['amount']):
            if row['amount'] >= 0:
                score += 20
            total_checks += 20
        
        # Consistency checks
        if 'created_date' in row and pd.notna(row['created_date']):
            if row['created_date'] <= datetime.now():
                score += 20
            total_checks += 20
        
        scores.append(score if total_checks == 0 else (score / total_checks) * 100)
    
    return pd.Series(scores)

# Schedule daily ETL
@hy.cron("0 2 * * *")  # Run at 2 AM daily
def run_daily_etl():
    """Schedule daily ETL pipeline execution"""
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    pipeline_id = f"daily_{yesterday}_{int(datetime.now().timestamp())}"
    
    # Send workflow with arguments
    workflow_run = daily_etl_pipeline.send(ETLContext(
        pipeline_id=pipeline_id,
        source_table="raw_events",
        destination_table="processed_events", 
        date=yesterday
    ))
    
    return {
        "pipeline_id": pipeline_id,
        "workflow_run_id": workflow_run.workflow_run_id,
        "date": yesterday,
        "started_at": datetime.now().isoformat()
    }

Usage Examples

Manual Pipeline Execution

# Trigger ETL pipeline for specific date
curl -X POST http://localhost:8000/etl/run \
  -H "Content-Type: application/json" \
  -d '{
    "pipeline_id": "manual_2024_01_15",
    "source_table": "raw_transactions", 
    "destination_table": "clean_transactions",
    "date": "2024-01-15",
    "batch_size": 500
  }'

# Check pipeline status
curl http://localhost:8000/etl/status/manual_2024_01_15

Pipeline Monitoring

@hy.task
def monitor_pipeline_health():
    """Monitor ETL pipeline performance"""
    
    # Check recent pipeline runs
    recent_runs = get_recent_pipeline_runs(days=7)
    
    metrics = {
        "total_runs": len(recent_runs),
        "success_rate": sum(1 for run in recent_runs if run['status'] == 'success') / len(recent_runs) * 100,
        "avg_duration": sum(run['duration'] for run in recent_runs) / len(recent_runs),
        "data_volume": sum(run.get('row_count', 0) for run in recent_runs)
    }
    
    # Alert if success rate below threshold
    if metrics['success_rate'] < 95:
        send_alert("ETL pipeline success rate below 95%", metrics)
    
    return metrics

Production Considerations

  • Data lineage: Track data flow and transformations for compliance
  • Incremental processing: Process only changed/new data for efficiency
  • Schema evolution: Handle source schema changes gracefully
  • Data quality monitoring: Implement comprehensive data validation
  • Resource management: Monitor memory and CPU usage for large datasets
  • Backup and recovery: Implement rollback mechanisms for failed loads

Next Steps