Overview
This example demonstrates efficient batch processing patterns:- Dataset chunking - Split large datasets into optimal batch sizes
- Parallel processing - Process multiple batches simultaneously
- Progress tracking - Monitor batch completion and overall progress
- Result aggregation - Combine results from all processed batches
- Error recovery - Handle individual batch failures without stopping entire job
- Resource optimization - Balance throughput with memory usage
Task Definitions
Copy
Ask AI
from hyrex import HyrexRegistry, HyrexKV
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import json
import time
import math
from datetime import datetime
hy = HyrexRegistry()
class BatchContext(BaseModel):
batch_id: int
total_batches: int
items: List[Dict[str, Any]]
job_id: str
batch_size: int
processing_type: str
class BatchJobContext(BaseModel):
job_id: str
total_items: int
batch_size: int = 100
max_parallel_batches: int = 5
processing_type: str = "default"
@hy.task(queue="batch-processing", max_retries=3)
def process_batch(context: BatchContext):
"""Process a single batch of items"""
start_time = time.time()
results = []
errors = []
try:
for i, item in enumerate(context.items):
try:
# Process individual item based on type
if context.processing_type == "data_validation":
result = validate_data_item(item)
elif context.processing_type == "data_enrichment":
result = enrich_data_item(item)
elif context.processing_type == "format_conversion":
result = convert_data_format(item)
else:
result = process_default_item(item)
results.append({
"item_id": item.get("id", i),
"status": "success",
"result": result
})
except Exception as e:
errors.append({
"item_id": item.get("id", i),
"error": str(e),
"item": item
})
# Store batch results in KV store
batch_result = {
"batch_id": context.batch_id,
"job_id": context.job_id,
"processed_items": len(results),
"failed_items": len(errors),
"results": results,
"errors": errors,
"processing_time": time.time() - start_time,
"processed_at": datetime.now().isoformat()
}
HyrexKV.set(
f"batch_result:{context.job_id}:{context.batch_id}",
json.dumps(batch_result),
expiry_seconds=86400 # 24 hours
)
# Update job progress
update_job_progress(context.job_id, context.batch_id, context.total_batches)
return {
"batch_id": context.batch_id,
"processed": len(results),
"failed": len(errors),
"success": len(errors) == 0,
"processing_time": time.time() - start_time
}
except Exception as e:
# Batch-level error - store for retry
error_result = {
"batch_id": context.batch_id,
"job_id": context.job_id,
"status": "failed",
"error": str(e),
"failed_at": datetime.now().isoformat()
}
HyrexKV.set(
f"batch_error:{context.job_id}:{context.batch_id}",
json.dumps(error_result),
expiry_seconds=86400
)
raise e
@hy.task(timeout_seconds=3600) # 1 hour timeout
def process_large_dataset(context: BatchJobContext):
"""Orchestrate processing of large dataset in batches"""
try:
# Initialize job tracking
job_metadata = {
"job_id": context.job_id,
"total_items": context.total_items,
"batch_size": context.batch_size,
"total_batches": math.ceil(context.total_items / context.batch_size),
"processing_type": context.processing_type,
"started_at": datetime.now().isoformat(),
"status": "processing",
"completed_batches": 0,
"failed_batches": 0
}
HyrexKV.set(
f"job_metadata:{context.job_id}",
json.dumps(job_metadata),
expiry_seconds=86400
)
# Load dataset (this would typically come from database/file)
items = load_dataset(context.total_items, context.processing_type)
# Split into batches
batches = []
total_batches = math.ceil(len(items) / context.batch_size)
# Submit batches with concurrency control
active_tasks = []
batch_tasks = []
for i in range(0, len(items), context.batch_size):
batch_items = items[i:i + context.batch_size]
batch_id = i // context.batch_size
batch_context = BatchContext(
batch_id=batch_id,
total_batches=total_batches,
items=batch_items,
job_id=context.job_id,
batch_size=context.batch_size,
processing_type=context.processing_type
)
# Submit batch for processing
task = process_batch.send(batch_context)
batch_tasks.append({
"batch_id": batch_id,
"task_id": task.task_id,
"submitted_at": datetime.now().isoformat()
})
# Update job metadata with batch info
job_metadata["batch_tasks"] = batch_tasks
job_metadata["batches_submitted"] = len(batch_tasks)
HyrexKV.set(
f"job_metadata:{context.job_id}",
json.dumps(job_metadata),
expiry_seconds=86400
)
return {
"job_id": context.job_id,
"status": "batches_submitted",
"total_batches": total_batches,
"batch_size": context.batch_size,
"submitted_at": datetime.now().isoformat()
}
except Exception as e:
# Update job status to failed
job_metadata["status"] = "failed"
job_metadata["error"] = str(e)
job_metadata["failed_at"] = datetime.now().isoformat()
HyrexKV.set(
f"job_metadata:{context.job_id}",
json.dumps(job_metadata),
expiry_seconds=86400
)
raise e
@hy.task
def aggregate_batch_results(job_id: str):
"""Aggregate results from all completed batches"""
# Get job metadata
job_metadata_str = HyrexKV.get(f"job_metadata:{job_id}")
if not job_metadata_str:
raise Exception(f"Job metadata not found for {job_id}")
job_metadata = json.loads(job_metadata_str)
total_batches = job_metadata["total_batches"]
# Collect all batch results
all_results = []
all_errors = []
completed_batches = 0
failed_batches = 0
total_processing_time = 0
for batch_id in range(total_batches):
# Try to get batch result
batch_result_str = HyrexKV.get(f"batch_result:{job_id}:{batch_id}")
if batch_result_str:
batch_result = json.loads(batch_result_str)
all_results.extend(batch_result["results"])
all_errors.extend(batch_result["errors"])
total_processing_time += batch_result["processing_time"]
completed_batches += 1
else:
# Check if batch failed
batch_error_str = HyrexKV.get(f"batch_error:{job_id}:{batch_id}")
if batch_error_str:
failed_batches += 1
batch_error = json.loads(batch_error_str)
all_errors.append({
"batch_id": batch_id,
"batch_error": batch_error["error"],
"type": "batch_failure"
})
# Generate final report
final_report = {
"job_id": job_id,
"total_items_processed": len(all_results),
"total_items_failed": len(all_errors),
"success_rate": (len(all_results) / (len(all_results) + len(all_errors))) * 100 if (len(all_results) + len(all_errors)) > 0 else 0,
"completed_batches": completed_batches,
"failed_batches": failed_batches,
"total_processing_time": total_processing_time,
"avg_batch_time": total_processing_time / completed_batches if completed_batches > 0 else 0,
"aggregated_at": datetime.now().isoformat(),
"detailed_results": all_results[:100], # First 100 for preview
"error_summary": summarize_errors(all_errors)
}
# Store final report
HyrexKV.set(
f"final_report:{job_id}",
json.dumps(final_report),
expiry_seconds=604800 # 7 days
)
# Update job metadata
job_metadata["status"] = "completed" if failed_batches == 0 else "completed_with_errors"
job_metadata["completed_at"] = datetime.now().isoformat()
job_metadata["final_report"] = final_report
HyrexKV.set(
f"job_metadata:{job_id}",
json.dumps(job_metadata),
expiry_seconds=604800
)
return final_report
def update_job_progress(job_id: str, completed_batch_id: int, total_batches: int):
"""Update job progress tracking"""
progress_key = f"job_progress:{job_id}"
try:
progress_data = HyrexKV.get(progress_key)
if progress_data:
progress = json.loads(progress_data)
else:
progress = {"completed_batches": set(), "total_batches": total_batches}
progress["completed_batches"].add(completed_batch_id)
progress["completion_percentage"] = (len(progress["completed_batches"]) / total_batches) * 100
progress["last_updated"] = datetime.now().isoformat()
# Convert set to list for JSON serialization
progress_to_store = {
**progress,
"completed_batches": list(progress["completed_batches"])
}
HyrexKV.set(progress_key, json.dumps(progress_to_store), expiry_seconds=86400)
# Check if all batches are complete
if len(progress["completed_batches"]) == total_batches:
# Trigger aggregation
aggregate_batch_results.send(job_id)
except Exception as e:
print(f"Failed to update progress for job {job_id}: {e}")
def load_dataset(total_items: int, processing_type: str) -> List[Dict[str, Any]]:
"""Load dataset for processing (mock implementation)"""
# In production, this would load from database, file, API, etc.
items = []
for i in range(total_items):
if processing_type == "data_validation":
items.append({
"id": i,
"email": f"user{i}@example.com",
"age": 20 + (i % 60),
"country": ["US", "CA", "UK", "DE", "FR"][i % 5]
})
elif processing_type == "data_enrichment":
items.append({
"id": i,
"user_id": i,
"purchase_amount": round((i % 1000) + 10.50, 2)
})
else:
items.append({
"id": i,
"data": f"item_{i}_data",
"value": i * 2
})
return items
def process_default_item(item: Dict[str, Any]) -> Dict[str, Any]:
"""Default item processing"""
return {
"processed_id": item["id"],
"processed_value": item.get("value", 0) * 2,
"processed_at": datetime.now().isoformat()
}
def validate_data_item(item: Dict[str, Any]) -> Dict[str, Any]:
"""Validate data item"""
validations = {
"email_valid": "@" in item.get("email", ""),
"age_valid": 0 <= item.get("age", 0) <= 150,
"country_valid": item.get("country") in ["US", "CA", "UK", "DE", "FR"]
}
return {
"item_id": item["id"],
"validations": validations,
"is_valid": all(validations.values())
}
def enrich_data_item(item: Dict[str, Any]) -> Dict[str, Any]:
"""Enrich data item with additional information"""
return {
"original_item": item,
"enriched_data": {
"purchase_category": "premium" if item.get("purchase_amount", 0) > 500 else "standard",
"customer_tier": "gold" if item.get("user_id", 0) % 10 == 0 else "silver"
},
"enriched_at": datetime.now().isoformat()
}
def convert_data_format(item: Dict[str, Any]) -> Dict[str, Any]:
"""Convert data item to different format"""
return {
"legacy_id": item["id"],
"new_format": {
"identifier": f"NEW_{item['id']:06d}",
"metadata": item.get("data", ""),
"computed_value": item.get("value", 0) ** 2
}
}
def summarize_errors(errors: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Summarize error patterns"""
error_types = {}
for error in errors:
error_type = error.get("type", "processing_error")
if error_type not in error_types:
error_types[error_type] = 0
error_types[error_type] += 1
return {
"total_errors": len(errors),
"error_types": error_types,
"sample_errors": errors[:10] # First 10 errors as samples
}
Usage Examples
Start Batch Processing Job
Copy
Ask AI
# Process large dataset with validation
curl -X POST http://localhost:8000/batch/start \
-H "Content-Type: application/json" \
-d '{
"job_id": "validation_job_20241201",
"total_items": 10000,
"batch_size": 200,
"processing_type": "data_validation"
}'
# Check job progress
curl http://localhost:8000/batch/progress/validation_job_20241201
# Get final results
curl http://localhost:8000/batch/results/validation_job_20241201
Monitor Batch Processing
Copy
Ask AI
@hy.task
def monitor_batch_jobs():
"""Monitor active batch processing jobs"""
active_jobs = get_active_batch_jobs()
for job in active_jobs:
progress = get_job_progress(job["job_id"])
if progress["completion_percentage"] > 0:
estimated_completion = calculate_eta(
progress["completion_percentage"],
job["started_at"]
)
print(f"Job {job['job_id']}: {progress['completion_percentage']:.1f}% "
f"complete, ETA: {estimated_completion}")
REST API Integration
Copy
Ask AI
from fastapi import FastAPI, HTTPException
from .tasks import process_large_dataset, get_job_progress, get_final_report
app = FastAPI()
@app.post("/batch/start")
async def start_batch_job(request: BatchJobContext):
"""Start a new batch processing job"""
task = process_large_dataset.send(request)
return {
"message": "Batch job started",
"task_id": task.task_id,
"job_id": request.job_id
}
@app.get("/batch/progress/{job_id}")
async def get_batch_progress(job_id: str):
"""Get batch job progress"""
progress_data = HyrexKV.get(f"job_progress:{job_id}")
if not progress_data:
raise HTTPException(status_code=404, detail="Job not found")
return json.loads(progress_data)
@app.get("/batch/results/{job_id}")
async def get_batch_results(job_id: str):
"""Get final batch processing results"""
results_data = HyrexKV.get(f"final_report:{job_id}")
if not results_data:
raise HTTPException(status_code=404, detail="Results not available")
return json.loads(results_data)
Production Considerations
- Memory management: Monitor memory usage with large batches
- Error isolation: Ensure single batch failures don’t affect others
- Progress persistence: Store progress to handle worker restarts
- Result storage: Use appropriate storage for large result sets
- Concurrency control: Balance throughput with resource constraints
- Data consistency: Ensure atomic operations for critical data