Build scalable batch processing systems that handle large datasets by breaking them into manageable chunks with progress tracking and result aggregation.

Overview

This example demonstrates efficient batch processing patterns:
  • Dataset chunking - Split large datasets into optimal batch sizes
  • Parallel processing - Process multiple batches simultaneously
  • Progress tracking - Monitor batch completion and overall progress
  • Result aggregation - Combine results from all processed batches
  • Error recovery - Handle individual batch failures without stopping entire job
  • Resource optimization - Balance throughput with memory usage
Perfect for data migrations, bulk operations, report generation, or any large-scale data processing.

Task Definitions

from hyrex import HyrexRegistry, HyrexKV
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import json
import time
import math
from datetime import datetime

hy = HyrexRegistry()

class BatchContext(BaseModel):
    batch_id: int
    total_batches: int
    items: List[Dict[str, Any]]
    job_id: str
    batch_size: int
    processing_type: str

class BatchJobContext(BaseModel):
    job_id: str
    total_items: int
    batch_size: int = 100
    max_parallel_batches: int = 5
    processing_type: str = "default"

@hy.task(queue="batch-processing", max_retries=3)
def process_batch(context: BatchContext):
    """Process a single batch of items"""
    
    start_time = time.time()
    results = []
    errors = []
    
    try:
        for i, item in enumerate(context.items):
            try:
                # Process individual item based on type
                if context.processing_type == "data_validation":
                    result = validate_data_item(item)
                elif context.processing_type == "data_enrichment":
                    result = enrich_data_item(item)
                elif context.processing_type == "format_conversion":
                    result = convert_data_format(item)
                else:
                    result = process_default_item(item)
                
                results.append({
                    "item_id": item.get("id", i),
                    "status": "success",
                    "result": result
                })
                
            except Exception as e:
                errors.append({
                    "item_id": item.get("id", i),
                    "error": str(e),
                    "item": item
                })
        
        # Store batch results in KV store
        batch_result = {
            "batch_id": context.batch_id,
            "job_id": context.job_id,
            "processed_items": len(results),
            "failed_items": len(errors),
            "results": results,
            "errors": errors,
            "processing_time": time.time() - start_time,
            "processed_at": datetime.now().isoformat()
        }
        
        HyrexKV.set(
            f"batch_result:{context.job_id}:{context.batch_id}",
            json.dumps(batch_result),
            expiry_seconds=86400  # 24 hours
        )
        
        # Update job progress
        update_job_progress(context.job_id, context.batch_id, context.total_batches)
        
        return {
            "batch_id": context.batch_id,
            "processed": len(results),
            "failed": len(errors),
            "success": len(errors) == 0,
            "processing_time": time.time() - start_time
        }
        
    except Exception as e:
        # Batch-level error - store for retry
        error_result = {
            "batch_id": context.batch_id,
            "job_id": context.job_id,
            "status": "failed",
            "error": str(e),
            "failed_at": datetime.now().isoformat()
        }
        
        HyrexKV.set(
            f"batch_error:{context.job_id}:{context.batch_id}",
            json.dumps(error_result),
            expiry_seconds=86400
        )
        
        raise e

@hy.task(timeout_seconds=3600)  # 1 hour timeout
def process_large_dataset(context: BatchJobContext):
    """Orchestrate processing of large dataset in batches"""
    
    try:
        # Initialize job tracking
        job_metadata = {
            "job_id": context.job_id,
            "total_items": context.total_items,
            "batch_size": context.batch_size,
            "total_batches": math.ceil(context.total_items / context.batch_size),
            "processing_type": context.processing_type,
            "started_at": datetime.now().isoformat(),
            "status": "processing",
            "completed_batches": 0,
            "failed_batches": 0
        }
        
        HyrexKV.set(
            f"job_metadata:{context.job_id}",
            json.dumps(job_metadata),
            expiry_seconds=86400
        )
        
        # Load dataset (this would typically come from database/file)
        items = load_dataset(context.total_items, context.processing_type)
        
        # Split into batches
        batches = []
        total_batches = math.ceil(len(items) / context.batch_size)
        
        # Submit batches with concurrency control
        active_tasks = []
        batch_tasks = []
        
        for i in range(0, len(items), context.batch_size):
            batch_items = items[i:i + context.batch_size]
            batch_id = i // context.batch_size
            
            batch_context = BatchContext(
                batch_id=batch_id,
                total_batches=total_batches,
                items=batch_items,
                job_id=context.job_id,
                batch_size=context.batch_size,
                processing_type=context.processing_type
            )
            
            # Submit batch for processing
            task = process_batch.send(batch_context)
            batch_tasks.append({
                "batch_id": batch_id,
                "task_id": task.task_id,
                "submitted_at": datetime.now().isoformat()
            })
        
        # Update job metadata with batch info
        job_metadata["batch_tasks"] = batch_tasks
        job_metadata["batches_submitted"] = len(batch_tasks)
        
        HyrexKV.set(
            f"job_metadata:{context.job_id}",
            json.dumps(job_metadata),
            expiry_seconds=86400
        )
        
        return {
            "job_id": context.job_id,
            "status": "batches_submitted",
            "total_batches": total_batches,
            "batch_size": context.batch_size,
            "submitted_at": datetime.now().isoformat()
        }
        
    except Exception as e:
        # Update job status to failed
        job_metadata["status"] = "failed"
        job_metadata["error"] = str(e)
        job_metadata["failed_at"] = datetime.now().isoformat()
        
        HyrexKV.set(
            f"job_metadata:{context.job_id}",
            json.dumps(job_metadata),
            expiry_seconds=86400
        )
        
        raise e

@hy.task
def aggregate_batch_results(job_id: str):
    """Aggregate results from all completed batches"""
    
    # Get job metadata
    job_metadata_str = HyrexKV.get(f"job_metadata:{job_id}")
    if not job_metadata_str:
        raise Exception(f"Job metadata not found for {job_id}")
    
    job_metadata = json.loads(job_metadata_str)
    total_batches = job_metadata["total_batches"]
    
    # Collect all batch results
    all_results = []
    all_errors = []
    completed_batches = 0
    failed_batches = 0
    total_processing_time = 0
    
    for batch_id in range(total_batches):
        # Try to get batch result
        batch_result_str = HyrexKV.get(f"batch_result:{job_id}:{batch_id}")
        if batch_result_str:
            batch_result = json.loads(batch_result_str)
            all_results.extend(batch_result["results"])
            all_errors.extend(batch_result["errors"])
            total_processing_time += batch_result["processing_time"]
            completed_batches += 1
        else:
            # Check if batch failed
            batch_error_str = HyrexKV.get(f"batch_error:{job_id}:{batch_id}")
            if batch_error_str:
                failed_batches += 1
                batch_error = json.loads(batch_error_str)
                all_errors.append({
                    "batch_id": batch_id,
                    "batch_error": batch_error["error"],
                    "type": "batch_failure"
                })
    
    # Generate final report
    final_report = {
        "job_id": job_id,
        "total_items_processed": len(all_results),
        "total_items_failed": len(all_errors),
        "success_rate": (len(all_results) / (len(all_results) + len(all_errors))) * 100 if (len(all_results) + len(all_errors)) > 0 else 0,
        "completed_batches": completed_batches,
        "failed_batches": failed_batches,
        "total_processing_time": total_processing_time,
        "avg_batch_time": total_processing_time / completed_batches if completed_batches > 0 else 0,
        "aggregated_at": datetime.now().isoformat(),
        "detailed_results": all_results[:100],  # First 100 for preview
        "error_summary": summarize_errors(all_errors)
    }
    
    # Store final report
    HyrexKV.set(
        f"final_report:{job_id}",
        json.dumps(final_report),
        expiry_seconds=604800  # 7 days
    )
    
    # Update job metadata
    job_metadata["status"] = "completed" if failed_batches == 0 else "completed_with_errors"
    job_metadata["completed_at"] = datetime.now().isoformat()
    job_metadata["final_report"] = final_report
    
    HyrexKV.set(
        f"job_metadata:{job_id}",
        json.dumps(job_metadata),
        expiry_seconds=604800
    )
    
    return final_report

def update_job_progress(job_id: str, completed_batch_id: int, total_batches: int):
    """Update job progress tracking"""
    
    progress_key = f"job_progress:{job_id}"
    
    try:
        progress_data = HyrexKV.get(progress_key)
        if progress_data:
            progress = json.loads(progress_data)
        else:
            progress = {"completed_batches": set(), "total_batches": total_batches}
        
        progress["completed_batches"].add(completed_batch_id)
        progress["completion_percentage"] = (len(progress["completed_batches"]) / total_batches) * 100
        progress["last_updated"] = datetime.now().isoformat()
        
        # Convert set to list for JSON serialization
        progress_to_store = {
            **progress,
            "completed_batches": list(progress["completed_batches"])
        }
        
        HyrexKV.set(progress_key, json.dumps(progress_to_store), expiry_seconds=86400)
        
        # Check if all batches are complete
        if len(progress["completed_batches"]) == total_batches:
            # Trigger aggregation
            aggregate_batch_results.send(job_id)
        
    except Exception as e:
        print(f"Failed to update progress for job {job_id}: {e}")

def load_dataset(total_items: int, processing_type: str) -> List[Dict[str, Any]]:
    """Load dataset for processing (mock implementation)"""
    
    # In production, this would load from database, file, API, etc.
    items = []
    for i in range(total_items):
        if processing_type == "data_validation":
            items.append({
                "id": i,
                "email": f"user{i}@example.com",
                "age": 20 + (i % 60),
                "country": ["US", "CA", "UK", "DE", "FR"][i % 5]
            })
        elif processing_type == "data_enrichment":
            items.append({
                "id": i,
                "user_id": i,
                "purchase_amount": round((i % 1000) + 10.50, 2)
            })
        else:
            items.append({
                "id": i,
                "data": f"item_{i}_data",
                "value": i * 2
            })
    
    return items

def process_default_item(item: Dict[str, Any]) -> Dict[str, Any]:
    """Default item processing"""
    return {
        "processed_id": item["id"],
        "processed_value": item.get("value", 0) * 2,
        "processed_at": datetime.now().isoformat()
    }

def validate_data_item(item: Dict[str, Any]) -> Dict[str, Any]:
    """Validate data item"""
    validations = {
        "email_valid": "@" in item.get("email", ""),
        "age_valid": 0 <= item.get("age", 0) <= 150,
        "country_valid": item.get("country") in ["US", "CA", "UK", "DE", "FR"]
    }
    
    return {
        "item_id": item["id"],
        "validations": validations,
        "is_valid": all(validations.values())
    }

def enrich_data_item(item: Dict[str, Any]) -> Dict[str, Any]:
    """Enrich data item with additional information"""
    return {
        "original_item": item,
        "enriched_data": {
            "purchase_category": "premium" if item.get("purchase_amount", 0) > 500 else "standard",
            "customer_tier": "gold" if item.get("user_id", 0) % 10 == 0 else "silver"
        },
        "enriched_at": datetime.now().isoformat()
    }

def convert_data_format(item: Dict[str, Any]) -> Dict[str, Any]:
    """Convert data item to different format"""
    return {
        "legacy_id": item["id"],
        "new_format": {
            "identifier": f"NEW_{item['id']:06d}",
            "metadata": item.get("data", ""),
            "computed_value": item.get("value", 0) ** 2
        }
    }

def summarize_errors(errors: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Summarize error patterns"""
    error_types = {}
    for error in errors:
        error_type = error.get("type", "processing_error")
        if error_type not in error_types:
            error_types[error_type] = 0
        error_types[error_type] += 1
    
    return {
        "total_errors": len(errors),
        "error_types": error_types,
        "sample_errors": errors[:10]  # First 10 errors as samples
    }

Usage Examples

Start Batch Processing Job

# Process large dataset with validation
curl -X POST http://localhost:8000/batch/start \
  -H "Content-Type: application/json" \
  -d '{
    "job_id": "validation_job_20241201",
    "total_items": 10000,
    "batch_size": 200,
    "processing_type": "data_validation"
  }'

# Check job progress
curl http://localhost:8000/batch/progress/validation_job_20241201

# Get final results
curl http://localhost:8000/batch/results/validation_job_20241201

Monitor Batch Processing

@hy.task
def monitor_batch_jobs():
    """Monitor active batch processing jobs"""
    
    active_jobs = get_active_batch_jobs()
    
    for job in active_jobs:
        progress = get_job_progress(job["job_id"])
        
        if progress["completion_percentage"] > 0:
            estimated_completion = calculate_eta(
                progress["completion_percentage"],
                job["started_at"]
            )
            
            print(f"Job {job['job_id']}: {progress['completion_percentage']:.1f}% "
                  f"complete, ETA: {estimated_completion}")

REST API Integration

from fastapi import FastAPI, HTTPException
from .tasks import process_large_dataset, get_job_progress, get_final_report

app = FastAPI()

@app.post("/batch/start")
async def start_batch_job(request: BatchJobContext):
    """Start a new batch processing job"""
    
    task = process_large_dataset.send(request)
    
    return {
        "message": "Batch job started",
        "task_id": task.task_id,
        "job_id": request.job_id
    }

@app.get("/batch/progress/{job_id}")
async def get_batch_progress(job_id: str):
    """Get batch job progress"""
    
    progress_data = HyrexKV.get(f"job_progress:{job_id}")
    if not progress_data:
        raise HTTPException(status_code=404, detail="Job not found")
    
    return json.loads(progress_data)

@app.get("/batch/results/{job_id}")
async def get_batch_results(job_id: str):
    """Get final batch processing results"""
    
    results_data = HyrexKV.get(f"final_report:{job_id}")
    if not results_data:
        raise HTTPException(status_code=404, detail="Results not available")
    
    return json.loads(results_data)

Production Considerations

  • Memory management: Monitor memory usage with large batches
  • Error isolation: Ensure single batch failures don’t affect others
  • Progress persistence: Store progress to handle worker restarts
  • Result storage: Use appropriate storage for large result sets
  • Concurrency control: Balance throughput with resource constraints
  • Data consistency: Ensure atomic operations for critical data

Next Steps