Overview
This example demonstrates image processing automation:- Multi-size generation from single uploaded images
- Asynchronous processing to avoid blocking web requests
- CDN integration for optimized delivery
- Error handling for corrupted or invalid files
- Progress tracking for long-running operations
Task Definitions
from hyrex import HyrexRegistry
from pydantic import BaseModel
import requests
from PIL import Image
import io
import boto3
from typing import List, Tuple
hy = HyrexRegistry()
class ImageProcessingContext(BaseModel):
image_url: str
user_id: int
sizes: List[Tuple[int, int]] = [(100, 100), (300, 300), (800, 800)]
quality: int = 85
output_format: str = "JPEG"
@hy.task(
queue="image-processing",
max_retries=3,
timeout_seconds=300
)
def process_uploaded_image(context: ImageProcessingContext):
"""Generate multiple sizes of an uploaded image"""
try:
# Download original image
response = requests.get(context.image_url, timeout=30)
response.raise_for_status()
# Open with PIL
original_image = Image.open(io.BytesIO(response.content))
# Convert to RGB if necessary (handles RGBA, etc.)
if original_image.mode in ('RGBA', 'LA', 'P'):
original_image = original_image.convert('RGB')
processed_images = []
s3_client = boto3.client('s3')
for width, height in context.sizes:
# Resize image maintaining aspect ratio
resized_image = original_image.copy()
resized_image.thumbnail((width, height), Image.Resampling.LANCZOS)
# Save to buffer
buffer = io.BytesIO()
resized_image.save(
buffer,
format=context.output_format,
quality=context.quality,
optimize=True
)
buffer.seek(0)
# Upload to S3
key = f"images/{context.user_id}/{width}x{height}.jpg"
s3_client.upload_fileobj(
buffer,
'your-cdn-bucket',
key,
ExtraArgs={'ContentType': 'image/jpeg', 'ACL': 'public-read'}
)
processed_url = f"https://cdn.example.com/{key}"
processed_images.append({
"size": f"{width}x{height}",
"url": processed_url,
"width": resized_image.width,
"height": resized_image.height
})
return {
"success": True,
"original": {
"url": context.image_url,
"width": original_image.width,
"height": original_image.height
},
"processed": processed_images,
"user_id": context.user_id,
"formats_generated": len(processed_images)
}
except requests.RequestException as e:
raise Exception(f"Failed to download image: {str(e)}")
except Image.UnidentifiedImageError:
raise Exception("Invalid or corrupted image file")
except Exception as e:
raise Exception(f"Image processing failed: {str(e)}")
@hy.task(max_retries=2)
def notify_processing_complete(user_id: int, processing_result: dict):
"""Notify user that image processing is complete"""
if processing_result["success"]:
# Send success notification
send_notification(
user_id,
"Image processing complete!",
f"Generated {processing_result['formats_generated']} optimized versions"
)
else:
# Send failure notification
send_notification(
user_id,
"Image processing failed",
"Please try uploading a different image"
)
return {"notification_sent": True, "user_id": user_id}
@hy.task
def batch_process_images(image_contexts: List[ImageProcessingContext]):
"""Process multiple images in parallel"""
task_ids = []
for context in image_contexts:
task = process_uploaded_image.send(context)
task_ids.append({
"task_id": task.task_id,
"user_id": context.user_id,
"image_url": context.image_url
})
return {
"batch_started": True,
"total_images": len(image_contexts),
"task_ids": task_ids
}
REST API Integration
from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic import BaseModel
import boto3
import uuid
from .tasks import process_uploaded_image, batch_process_images
app = FastAPI()
s3_client = boto3.client('s3')
class ProcessImageRequest(BaseModel):
image_url: str
user_id: int
sizes: list[tuple[int, int]] = [(100, 100), (300, 300), (800, 800)]
@app.post("/images/upload")
async def upload_and_process_image(file: UploadFile = File(...), user_id: int = 1):
"""Upload image and trigger processing"""
# Validate file type
if not file.content_type or not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="File must be an image")
try:
# Upload original to S3
file_extension = file.filename.split('.')[-1] if '.' in file.filename else 'jpg'
key = f"uploads/{user_id}/{uuid.uuid4()}.{file_extension}"
s3_client.upload_fileobj(
file.file,
'your-upload-bucket',
key,
ExtraArgs={'ContentType': file.content_type}
)
image_url = f"https://your-upload-bucket.s3.amazonaws.com/{key}"
# Queue processing task
task = process_uploaded_image.send(ImageProcessingContext(
image_url=image_url,
user_id=user_id
))
return {
"message": "Image uploaded and processing started",
"task_id": task.task_id,
"original_url": image_url,
"user_id": user_id
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
@app.post("/images/process")
async def process_existing_image(request: ProcessImageRequest):
"""Process an existing image by URL"""
task = process_uploaded_image.send(ImageProcessingContext(**request.dict()))
return {
"message": "Image processing started",
"task_id": task.task_id,
"image_url": request.image_url
}
@app.post("/images/batch")
async def batch_process_images_endpoint(requests: list[ProcessImageRequest]):
"""Process multiple images in batch"""
contexts = [ImageProcessingContext(**req.dict()) for req in requests]
task = batch_process_images.send(contexts)
return {
"message": f"Batch processing started for {len(contexts)} images",
"task_id": task.task_id,
"total_images": len(contexts)
}
@app.get("/images/status/{task_id}")
async def get_processing_status(task_id: str):
"""Get image processing status"""
task = hy.get_task(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.is_complete else None,
"error": task.error if task.has_failed else None
}
Usage Examples
# Upload and process image
curl -X POST http://localhost:8000/images/upload \
-F "file=@photo.jpg" \
-F "userId=123"
# Process existing image by URL
curl -X POST http://localhost:8000/images/process \
-H "Content-Type: application/json" \
-d '{
"image_url": "https://example.com/original.jpg",
"user_id": 123,
"sizes": [[150, 150], [500, 500]]
}'
# Check processing status
curl http://localhost:8000/images/status/task_12345
Dependencies
Python Requirements
Pillow==10.0.0
boto3==1.29.0
requests==2.31.0
TypeScript Dependencies
{
"dependencies": {
"sharp": "^0.32.0",
"aws-sdk": "^2.1400.0",
"node-fetch": "^3.3.0"
}
}
Production Considerations
- File validation: Validate image types and sizes before processing
- Storage optimization: Use appropriate S3 storage classes for different image sizes
- CDN configuration: Set up CloudFront for optimal image delivery
- Error handling: Implement fallbacks for corrupted or invalid images
- Memory management: Monitor memory usage for large image processing
- Cost optimization: Consider image processing costs and optimize accordingly
Next Steps
User Onboarding
Automate profile picture processing
Maintenance Tasks
Clean up processed images