Real-world examples and patterns for building with Hyrex
from hyrex import HyrexRegistry
from pydantic import BaseModel
import requests
hy = HyrexRegistry()
class ImageProcessingContext(BaseModel):
image_url: str
user_id: int
sizes: list[tuple[int, int]] = [(100, 100), (300, 300), (800, 800)]
@hy.task(
queue="image-processing",
max_retries=3,
timeout_seconds=300
)
def process_uploaded_image(context: ImageProcessingContext):
"""Generate multiple sizes of an uploaded image"""
# Download original image
response = requests.get(context.image_url)
image_data = response.content
processed_images = []
for width, height in context.sizes:
# Your image processing logic here
# Example: resize, optimize, upload to S3
processed_url = f"https://cdn.example.com/{context.user_id}/{width}x{height}.jpg"
processed_images.append(processed_url)
# Notify user or update database
return {
"original": context.image_url,
"processed": processed_images,
"user_id": context.user_id
}
# In your web application
@app.post("/upload")
def handle_upload(image: UploadFile, user_id: int):
# Save uploaded file
image_url = save_to_storage(image)
# Queue processing task
task = process_uploaded_image.send(
ImageProcessingContext(
image_url=image_url,
user_id=user_id
)
)
return {"message": "Upload successful", "task_id": task.id}
from datetime import datetime
class EmailContext(BaseModel):
to: str
subject: str
template: str
context: dict
@hy.task(
max_retries=5,
retry_backoff=True # Built-in exponential backoff
)
def send_email(context: EmailContext):
"""Send transactional emails with automatic retries"""
# Render template
html_body = render_template(context.template, context.context)
# Send via provider
response = email_provider.send(
to=context.to,
subject=context.subject,
html=html_body
)
if response.status_code >= 500:
# Retry on server errors
raise Exception(f"Email provider error: {response.status_code}")
return {
"message_id": response.message_id,
"sent_at": datetime.now().isoformat()
}
# Send welcome email after user signup
@app.post("/signup")
def signup(user: UserCreate):
user_id = create_user(user)
# Queue welcome email
task = send_email.send(EmailContext(
to=user.email,
subject="Welcome to Our App!",
template="welcome.html",
context={"name": user.name, "user_id": user_id}
))
return {"user_id": user_id, "email_task_id": str(task.id)}
from datetime import datetime, timedelta
from hyrex import HyrexKV, get_hyrex_context
class ETLContext(BaseModel):
source_table: str
destination_table: str
date: str
@hy.task
def extract_data(config: ETLContext):
"""Extract data from source"""
ctx = get_hyrex_context()
query = f"SELECT * FROM {config.source_table} WHERE date = '{config.date}'"
data = database.execute(query)
# Save to temporary storage
file_path = f"/tmp/extract_{config.date}.parquet"
save_parquet(data, file_path)
# Store path for next task
HyrexKV.set(f"workflow-{ctx.workflow_run_id}-extract", file_path)
return {"file_path": file_path, "row_count": len(data)}
@hy.task
def transform_data(config: ETLContext):
"""Transform extracted data"""
ctx = get_hyrex_context()
# Get file path from previous task
file_path = HyrexKV.get(f"workflow-{ctx.workflow_run_id}-extract")
# Load and transform
df = load_parquet(file_path)
df = apply_transformations(df)
# Save transformed data
output_path = file_path.replace("extract_", "transform_")
save_parquet(df, output_path)
# Store for next task
HyrexKV.set(f"workflow-{ctx.workflow_run_id}-transform", output_path)
return {"file_path": output_path, "row_count": len(df)}
@hy.task
def load_data(config: ETLContext):
"""Load data to destination"""
ctx = get_hyrex_context()
# Get file path from previous task
file_path = HyrexKV.get(f"workflow-{ctx.workflow_run_id}-transform")
# Load to destination
df = load_parquet(file_path)
database.bulk_insert(config.destination_table, df)
# Cleanup temp files
cleanup_files([file_path])
return {"loaded_rows": len(df)}
@hy.workflow(workflow_arg_schema=ETLContext)
def daily_etl_pipeline():
"""Complete ETL pipeline workflow"""
# Define the DAG
extract_data >> transform_data >> load_data
# Schedule daily ETL
@hy.task(cron="0 2 * * *") # Run at 2 AM daily
def run_daily_etl():
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
# Send workflow with arguments
daily_etl_pipeline.send(ETLContext(
source_table="raw_events",
destination_table="processed_events",
date=yesterday
))
from hyrex import HyrexKV
import time
import json
class APICallContext(BaseModel):
endpoint: str
params: dict
api_name: str = "external_api"
@hy.task(
max_retries=10,
retry_backoff=True # Built-in exponential backoff
)
def call_rate_limited_api(context: APICallContext):
"""Call external API with rate limit handling"""
# Track rate limit using KV store
rate_key = f"rate_limit:{context.api_name}"
try:
rate_data = HyrexKV.get(rate_key)
rate_info = json.loads(rate_data)
calls_made = rate_info.get("calls", 0)
reset_time = rate_info.get("reset_time", 0)
except:
calls_made = 0
reset_time = 0
# Reset counter if hour has passed
if time.time() > reset_time:
calls_made = 0
reset_time = time.time() + 3600 # Reset in 1 hour
if calls_made >= 100: # 100 calls per hour limit
# Wait or retry later
raise Exception("Rate limit exceeded, will retry")
# Make API call
response = requests.get(
f"https://api.example.com/{context.endpoint}",
params=context.params
)
# Update rate limit counter
calls_made += 1
HyrexKV.set(rate_key, json.dumps({
"calls": calls_made,
"reset_time": reset_time
}))
if response.status_code == 429:
# Rate limited by API
retry_after = int(response.headers.get("Retry-After", 60))
raise Exception(f"Rate limited, retry after {retry_after}s")
return response.json()
import json
class BatchContext(BaseModel):
batch_id: int
total_batches: int
items: list[dict]
@hy.task(queue="batch-processing")
def process_batch(context: BatchContext):
"""Process a batch of items"""
results = []
for item in context.items:
# Process each item
result = process_item(item)
results.append(result)
# Store results in KV store
HyrexKV.set(
f"batch_results:{context.batch_id}",
json.dumps({
"results": results,
"processed_at": datetime.now().isoformat()
})
)
return {
"batch_id": context.batch_id,
"processed": len(results),
"success": True
}
def process_large_dataset(items: list[dict], batch_size: int = 100):
"""Split large dataset into batches"""
batches = []
total_batches = (len(items) + batch_size - 1) // batch_size
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# Send each batch as a separate task
task = process_batch.send(BatchContext(
batch_id=i // batch_size,
total_batches=total_batches,
items=batch
))
batches.append(task)
return batches
# Helper to retrieve all batch results
def get_batch_results(total_batches: int) -> list:
"""Retrieve and combine results from all batches"""
all_results = []
for batch_id in range(total_batches):
try:
batch_data = HyrexKV.get(f"batch_results:{batch_id}")
batch_info = json.loads(batch_data)
all_results.extend(batch_info["results"])
except:
pass # Batch not yet processed
return all_results
import traceback
import logging
from hyrex import get_hyrex_context
logger = logging.getLogger(__name__)
class MonitoredTaskContext(BaseModel):
operation: str
data: dict
@hy.task(max_retries=3)
def monitored_task(context: MonitoredTaskContext):
"""Task with built-in monitoring and error handling"""
start_time = time.time()
task_context = get_hyrex_context()
try:
# Track task execution count
try:
count_data = HyrexKV.get(f"task_count:{context.operation}")
count = json.loads(count_data)["count"]
except:
count = 0
HyrexKV.set(
f"task_count:{context.operation}",
json.dumps({"count": count + 1, "last_run": datetime.now().isoformat()})
)
# Perform operation
result = perform_operation(context.operation, context.data)
# Record success metrics
duration = time.time() - start_time
HyrexKV.set(
f"task_metrics:{context.operation}:{task_context.task_id if task_context else 'unknown'}",
json.dumps({
"duration": duration,
"status": "success",
"timestamp": datetime.now().isoformat()
})
)
return {
"success": True,
"result": result,
"duration": duration
}
except ValidationError as e:
# Don't retry validation errors
return {
"success": False,
"error": "validation_error",
"details": str(e)
}
except ExternalAPIError as e:
# Retry external errors
if e.status_code >= 500:
raise # Will trigger retry
else:
return {
"success": False,
"error": "api_error",
"details": str(e)
}
except Exception as e:
# Log unexpected errors
logger.error(f"Task failed: {context.operation}", exc_info=True)
# Store error for debugging
error_key = f"task_error:{context.operation}:{time.time()}"
HyrexKV.set(
error_key,
json.dumps({
"error": str(e),
"error_type": type(e).__name__,
"context": context.dict(),
"traceback": traceback.format_exc(),
"attempt": task_context.attempt_number if task_context else 0,
"timestamp": datetime.now().isoformat()
})
)
raise # Will trigger retry
Was this page helpful?