Overview
This example demonstrates image processing automation:- Multi-size generation from single uploaded images
- Asynchronous processing to avoid blocking web requests
- CDN integration for optimized delivery
- Error handling for corrupted or invalid files
- Progress tracking for long-running operations
Task Definitions
Copy
Ask AI
from hyrex import HyrexRegistry
from pydantic import BaseModel
import requests
from PIL import Image
import io
import boto3
from typing import List, Tuple
hy = HyrexRegistry()
class ImageProcessingContext(BaseModel):
image_url: str
user_id: int
sizes: List[Tuple[int, int]] = [(100, 100), (300, 300), (800, 800)]
quality: int = 85
output_format: str = "JPEG"
@hy.task(
queue="image-processing",
max_retries=3,
timeout_seconds=300
)
def process_uploaded_image(context: ImageProcessingContext):
"""Generate multiple sizes of an uploaded image"""
try:
# Download original image
response = requests.get(context.image_url, timeout=30)
response.raise_for_status()
# Open with PIL
original_image = Image.open(io.BytesIO(response.content))
# Convert to RGB if necessary (handles RGBA, etc.)
if original_image.mode in ('RGBA', 'LA', 'P'):
original_image = original_image.convert('RGB')
processed_images = []
s3_client = boto3.client('s3')
for width, height in context.sizes:
# Resize image maintaining aspect ratio
resized_image = original_image.copy()
resized_image.thumbnail((width, height), Image.Resampling.LANCZOS)
# Save to buffer
buffer = io.BytesIO()
resized_image.save(
buffer,
format=context.output_format,
quality=context.quality,
optimize=True
)
buffer.seek(0)
# Upload to S3
key = f"images/{context.user_id}/{width}x{height}.jpg"
s3_client.upload_fileobj(
buffer,
'your-cdn-bucket',
key,
ExtraArgs={'ContentType': 'image/jpeg', 'ACL': 'public-read'}
)
processed_url = f"https://cdn.example.com/{key}"
processed_images.append({
"size": f"{width}x{height}",
"url": processed_url,
"width": resized_image.width,
"height": resized_image.height
})
return {
"success": True,
"original": {
"url": context.image_url,
"width": original_image.width,
"height": original_image.height
},
"processed": processed_images,
"user_id": context.user_id,
"formats_generated": len(processed_images)
}
except requests.RequestException as e:
raise Exception(f"Failed to download image: {str(e)}")
except Image.UnidentifiedImageError:
raise Exception("Invalid or corrupted image file")
except Exception as e:
raise Exception(f"Image processing failed: {str(e)}")
@hy.task(max_retries=2)
def notify_processing_complete(user_id: int, processing_result: dict):
"""Notify user that image processing is complete"""
if processing_result["success"]:
# Send success notification
send_notification(
user_id,
"Image processing complete!",
f"Generated {processing_result['formats_generated']} optimized versions"
)
else:
# Send failure notification
send_notification(
user_id,
"Image processing failed",
"Please try uploading a different image"
)
return {"notification_sent": True, "user_id": user_id}
@hy.task
def batch_process_images(image_contexts: List[ImageProcessingContext]):
"""Process multiple images in parallel"""
task_ids = []
for context in image_contexts:
task = process_uploaded_image.send(context)
task_ids.append({
"task_id": task.task_id,
"user_id": context.user_id,
"image_url": context.image_url
})
return {
"batch_started": True,
"total_images": len(image_contexts),
"task_ids": task_ids
}
REST API Integration
Copy
Ask AI
from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic import BaseModel
import boto3
import uuid
from .tasks import process_uploaded_image, batch_process_images
app = FastAPI()
s3_client = boto3.client('s3')
class ProcessImageRequest(BaseModel):
image_url: str
user_id: int
sizes: list[tuple[int, int]] = [(100, 100), (300, 300), (800, 800)]
@app.post("/images/upload")
async def upload_and_process_image(file: UploadFile = File(...), user_id: int = 1):
"""Upload image and trigger processing"""
# Validate file type
if not file.content_type or not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="File must be an image")
try:
# Upload original to S3
file_extension = file.filename.split('.')[-1] if '.' in file.filename else 'jpg'
key = f"uploads/{user_id}/{uuid.uuid4()}.{file_extension}"
s3_client.upload_fileobj(
file.file,
'your-upload-bucket',
key,
ExtraArgs={'ContentType': file.content_type}
)
image_url = f"https://your-upload-bucket.s3.amazonaws.com/{key}"
# Queue processing task
task = process_uploaded_image.send(ImageProcessingContext(
image_url=image_url,
user_id=user_id
))
return {
"message": "Image uploaded and processing started",
"task_id": task.task_id,
"original_url": image_url,
"user_id": user_id
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
@app.post("/images/process")
async def process_existing_image(request: ProcessImageRequest):
"""Process an existing image by URL"""
task = process_uploaded_image.send(ImageProcessingContext(**request.dict()))
return {
"message": "Image processing started",
"task_id": task.task_id,
"image_url": request.image_url
}
@app.post("/images/batch")
async def batch_process_images_endpoint(requests: list[ProcessImageRequest]):
"""Process multiple images in batch"""
contexts = [ImageProcessingContext(**req.dict()) for req in requests]
task = batch_process_images.send(contexts)
return {
"message": f"Batch processing started for {len(contexts)} images",
"task_id": task.task_id,
"total_images": len(contexts)
}
@app.get("/images/status/{task_id}")
async def get_processing_status(task_id: str):
"""Get image processing status"""
task = hy.get_task(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.is_complete else None,
"error": task.error if task.has_failed else None
}
Usage Examples
Copy
Ask AI
# Upload and process image
curl -X POST http://localhost:8000/images/upload \
-F "file=@photo.jpg" \
-F "userId=123"
# Process existing image by URL
curl -X POST http://localhost:8000/images/process \
-H "Content-Type: application/json" \
-d '{
"image_url": "https://example.com/original.jpg",
"user_id": 123,
"sizes": [[150, 150], [500, 500]]
}'
# Check processing status
curl http://localhost:8000/images/status/task_12345
Dependencies
Python Requirements
Copy
Ask AI
Pillow==10.0.0
boto3==1.29.0
requests==2.31.0
TypeScript Dependencies
Copy
Ask AI
{
"dependencies": {
"sharp": "^0.32.0",
"aws-sdk": "^2.1400.0",
"node-fetch": "^3.3.0"
}
}
Production Considerations
- File validation: Validate image types and sizes before processing
- Storage optimization: Use appropriate S3 storage classes for different image sizes
- CDN configuration: Set up CloudFront for optimal image delivery
- Error handling: Implement fallbacks for corrupted or invalid images
- Memory management: Monitor memory usage for large image processing
- Cost optimization: Consider image processing costs and optimize accordingly