Overview
This example demonstrates comprehensive ETL pipeline patterns:- Extract phase - Pull data from multiple sources (databases, APIs, files)
- Transform phase - Clean, validate, and transform data
- Load phase - Insert processed data into destination systems
- Workflow orchestration - Chain tasks with dependency management
- Error recovery - Handle failures with rollback and retry mechanisms
- Scheduled execution - Run pipelines on cron schedules
Task Definitions
Copy
Ask AI
from hyrex import HyrexRegistry, HyrexKV, get_hyrex_context
from pydantic import BaseModel
from datetime import datetime, timedelta
import pandas as pd
import psycopg2
import requests
from typing import List, Dict, Any
hy = HyrexRegistry()
class ETLContext(BaseModel):
pipeline_id: str
source_table: str
destination_table: str
date: str
batch_size: int = 1000
@hy.task(timeout_seconds=1800) # 30 minutes
def extract_data(config: ETLContext):
"""Extract data from source database"""
ctx = get_hyrex_context()
# Connect to source database
source_conn = psycopg2.connect(os.environ.get('SOURCE_DATABASE_URL'))
try:
query = f"""
SELECT * FROM {config.source_table}
WHERE created_date = %s
ORDER BY id
"""
# Use pandas for efficient data handling
df = pd.read_sql_query(query, source_conn, params=[config.date])
if df.empty:
return {
"status": "no_data",
"message": f"No data found for date {config.date}",
"row_count": 0
}
# Save to temporary parquet file
temp_file = f"/tmp/extract_{config.pipeline_id}_{config.date}.parquet"
df.to_parquet(temp_file, compression='snappy')
# Store file path for next task
HyrexKV.set(
f"pipeline-{config.pipeline_id}-extract",
temp_file,
expiry_seconds=7200 # 2 hours
)
# Store metadata
metadata = {
"source_table": config.source_table,
"row_count": len(df),
"columns": list(df.columns),
"date_range": {
"min": df['created_date'].min().isoformat() if 'created_date' in df else None,
"max": df['created_date'].max().isoformat() if 'created_date' in df else None
},
"file_path": temp_file,
"extracted_at": datetime.now().isoformat()
}
HyrexKV.set(
f"pipeline-{config.pipeline_id}-metadata",
json.dumps(metadata),
expiry_seconds=7200
)
return {
"status": "success",
"row_count": len(df),
"file_path": temp_file,
"metadata": metadata
}
finally:
source_conn.close()
@hy.task(timeout_seconds=2400) # 40 minutes
def transform_data(config: ETLContext):
"""Transform extracted data with validation and cleaning"""
ctx = get_hyrex_context()
# Get file path from previous task
temp_file = HyrexKV.get(f"pipeline-{config.pipeline_id}-extract")
if not temp_file:
raise Exception("Extract data not found - pipeline may have expired")
# Load data
df = pd.read_parquet(temp_file)
original_count = len(df)
# Data validation and cleaning
transform_log = []
# 1. Remove duplicates
duplicate_count = df.duplicated().sum()
if duplicate_count > 0:
df = df.drop_duplicates()
transform_log.append(f"Removed {duplicate_count} duplicate rows")
# 2. Handle missing values
null_counts = df.isnull().sum()
for column, null_count in null_counts.items():
if null_count > 0:
if column in ['email', 'user_id']: # Critical fields
df = df.dropna(subset=[column])
transform_log.append(f"Dropped {null_count} rows with null {column}")
else: # Non-critical fields
df[column] = df[column].fillna('Unknown')
transform_log.append(f"Filled {null_count} null values in {column}")
# 3. Data type conversions
if 'created_date' in df.columns:
df['created_date'] = pd.to_datetime(df['created_date'])
if 'amount' in df.columns:
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# Drop rows with invalid amounts
invalid_amount_count = df['amount'].isnull().sum()
if invalid_amount_count > 0:
df = df.dropna(subset=['amount'])
transform_log.append(f"Dropped {invalid_amount_count} rows with invalid amounts")
# 4. Business logic transformations
if 'status' in df.columns:
# Standardize status values
status_mapping = {
'active': 'ACTIVE',
'inactive': 'INACTIVE',
'pending': 'PENDING',
'cancelled': 'CANCELLED'
}
df['status'] = df['status'].str.lower().map(status_mapping).fillna('UNKNOWN')
transform_log.append("Standardized status values")
# 5. Add derived columns
df['processed_at'] = datetime.now()
df['pipeline_id'] = config.pipeline_id
df['data_quality_score'] = calculate_data_quality_score(df)
# Save transformed data
output_file = temp_file.replace("extract_", "transform_")
df.to_parquet(output_file, compression='snappy')
# Store for next task
HyrexKV.set(
f"pipeline-{config.pipeline_id}-transform",
output_file,
expiry_seconds=7200
)
transform_summary = {
"original_rows": original_count,
"final_rows": len(df),
"rows_removed": original_count - len(df),
"transform_log": transform_log,
"quality_metrics": {
"completeness": (df.count().sum() / (len(df) * len(df.columns))) * 100,
"avg_quality_score": df['data_quality_score'].mean()
},
"file_path": output_file,
"transformed_at": datetime.now().isoformat()
}
return {
"status": "success",
"summary": transform_summary
}
@hy.task(timeout_seconds=1800) # 30 minutes
def load_data(config: ETLContext):
"""Load transformed data to destination"""
ctx = get_hyrex_context()
# Get transformed data file
output_file = HyrexKV.get(f"pipeline-{config.pipeline_id}-transform")
if not output_file:
raise Exception("Transform data not found")
# Load transformed data
df = pd.read_parquet(output_file)
# Connect to destination database
dest_conn = psycopg2.connect(os.environ.get('DESTINATION_DATABASE_URL'))
try:
cursor = dest_conn.cursor()
# Create staging table
staging_table = f"{config.destination_table}_staging_{config.pipeline_id}"
create_staging_table_sql = f"""
CREATE TABLE IF NOT EXISTS {staging_table}
AS SELECT * FROM {config.destination_table} WHERE 1=0
"""
cursor.execute(create_staging_table_sql)
# Insert data in batches
total_rows = len(df)
loaded_rows = 0
for i in range(0, total_rows, config.batch_size):
batch_df = df.iloc[i:i + config.batch_size]
# Convert to records for insertion
records = batch_df.to_dict('records')
# Prepare bulk insert
columns = list(batch_df.columns)
placeholders = ', '.join(['%s'] * len(columns))
columns_str = ', '.join(columns)
insert_sql = f"""
INSERT INTO {staging_table} ({columns_str})
VALUES ({placeholders})
"""
# Execute batch insert
cursor.executemany(insert_sql, [tuple(record.values()) for record in records])
loaded_rows += len(batch_df)
# Update progress
progress = (loaded_rows / total_rows) * 100
print(f"Loading progress: {progress:.1f}% ({loaded_rows}/{total_rows})")
# Data quality checks on staging table
cursor.execute(f"SELECT COUNT(*) FROM {staging_table}")
staging_count = cursor.fetchone()[0]
if staging_count != total_rows:
raise Exception(f"Row count mismatch: expected {total_rows}, got {staging_count}")
# Swap staging table with main table (atomic operation)
cursor.execute(f"BEGIN")
cursor.execute(f"DROP TABLE IF EXISTS {config.destination_table}_old")
cursor.execute(f"ALTER TABLE {config.destination_table} RENAME TO {config.destination_table}_old")
cursor.execute(f"ALTER TABLE {staging_table} RENAME TO {config.destination_table}")
cursor.execute(f"COMMIT")
# Clean up old table
cursor.execute(f"DROP TABLE {config.destination_table}_old")
dest_conn.commit()
return {
"status": "success",
"loaded_rows": loaded_rows,
"destination_table": config.destination_table,
"loaded_at": datetime.now().isoformat()
}
except Exception as e:
dest_conn.rollback()
# Clean up staging table on error
try:
cursor.execute(f"DROP TABLE IF EXISTS {staging_table}")
dest_conn.commit()
except:
pass
raise e
finally:
dest_conn.close()
# Clean up temp files
cleanup_temp_files(config.pipeline_id)
@hy.workflow(workflow_arg_schema=ETLContext)
def daily_etl_pipeline():
"""Complete ETL pipeline workflow"""
# Define the DAG: Extract -> Transform -> Load
extract_data >> transform_data >> load_data
@hy.task
def cleanup_temp_files(pipeline_id: str):
"""Clean up temporary files after pipeline completion"""
import glob
temp_files = glob.glob(f"/tmp/*{pipeline_id}*")
removed_files = []
for file_path in temp_files:
try:
os.remove(file_path)
removed_files.append(file_path)
except OSError:
pass
# Clean up KV store entries
keys_to_remove = [
f"pipeline-{pipeline_id}-extract",
f"pipeline-{pipeline_id}-transform",
f"pipeline-{pipeline_id}-metadata"
]
for key in keys_to_remove:
try:
HyrexKV.delete(key)
except:
pass
return {"removed_files": removed_files, "cleaned_keys": keys_to_remove}
def calculate_data_quality_score(df: pd.DataFrame) -> pd.Series:
"""Calculate data quality score for each row"""
scores = []
for _, row in df.iterrows():
score = 0
total_checks = 0
# Completeness check (non-null values)
non_null_ratio = row.notna().sum() / len(row)
score += non_null_ratio * 40 # 40% weight
total_checks += 40
# Validity checks (business rules)
if 'email' in row and pd.notna(row['email']):
if '@' in str(row['email']):
score += 20
total_checks += 20
if 'amount' in row and pd.notna(row['amount']):
if row['amount'] >= 0:
score += 20
total_checks += 20
# Consistency checks
if 'created_date' in row and pd.notna(row['created_date']):
if row['created_date'] <= datetime.now():
score += 20
total_checks += 20
scores.append(score if total_checks == 0 else (score / total_checks) * 100)
return pd.Series(scores)
# Schedule daily ETL
@hy.cron("0 2 * * *") # Run at 2 AM daily
def run_daily_etl():
"""Schedule daily ETL pipeline execution"""
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
pipeline_id = f"daily_{yesterday}_{int(datetime.now().timestamp())}"
# Send workflow with arguments
workflow_run = daily_etl_pipeline.send(ETLContext(
pipeline_id=pipeline_id,
source_table="raw_events",
destination_table="processed_events",
date=yesterday
))
return {
"pipeline_id": pipeline_id,
"workflow_run_id": workflow_run.workflow_run_id,
"date": yesterday,
"started_at": datetime.now().isoformat()
}
Usage Examples
Manual Pipeline Execution
Copy
Ask AI
# Trigger ETL pipeline for specific date
curl -X POST http://localhost:8000/etl/run \
-H "Content-Type: application/json" \
-d '{
"pipeline_id": "manual_2024_01_15",
"source_table": "raw_transactions",
"destination_table": "clean_transactions",
"date": "2024-01-15",
"batch_size": 500
}'
# Check pipeline status
curl http://localhost:8000/etl/status/manual_2024_01_15
Pipeline Monitoring
Copy
Ask AI
@hy.task
def monitor_pipeline_health():
"""Monitor ETL pipeline performance"""
# Check recent pipeline runs
recent_runs = get_recent_pipeline_runs(days=7)
metrics = {
"total_runs": len(recent_runs),
"success_rate": sum(1 for run in recent_runs if run['status'] == 'success') / len(recent_runs) * 100,
"avg_duration": sum(run['duration'] for run in recent_runs) / len(recent_runs),
"data_volume": sum(run.get('row_count', 0) for run in recent_runs)
}
# Alert if success rate below threshold
if metrics['success_rate'] < 95:
send_alert("ETL pipeline success rate below 95%", metrics)
return metrics
Production Considerations
- Data lineage: Track data flow and transformations for compliance
- Incremental processing: Process only changed/new data for efficiency
- Schema evolution: Handle source schema changes gracefully
- Data quality monitoring: Implement comprehensive data validation
- Resource management: Monitor memory and CPU usage for large datasets
- Backup and recovery: Implement rollback mechanisms for failed loads