from hyrex import HyrexRegistry
from pydantic import BaseModel
import requests
from PIL import Image
import io
import boto3
from typing import List, Tuple
hy = HyrexRegistry()
class ImageProcessingContext(BaseModel):
image_url: str
user_id: int
sizes: List[Tuple[int, int]] = [(100, 100), (300, 300), (800, 800)]
quality: int = 85
output_format: str = "JPEG"
@hy.task(
queue="image-processing",
max_retries=3,
timeout_seconds=300
)
def process_uploaded_image(context: ImageProcessingContext):
"""Generate multiple sizes of an uploaded image"""
try:
# Download original image
response = requests.get(context.image_url, timeout=30)
response.raise_for_status()
# Open with PIL
original_image = Image.open(io.BytesIO(response.content))
# Convert to RGB if necessary (handles RGBA, etc.)
if original_image.mode in ('RGBA', 'LA', 'P'):
original_image = original_image.convert('RGB')
processed_images = []
s3_client = boto3.client('s3')
for width, height in context.sizes:
# Resize image maintaining aspect ratio
resized_image = original_image.copy()
resized_image.thumbnail((width, height), Image.Resampling.LANCZOS)
# Save to buffer
buffer = io.BytesIO()
resized_image.save(
buffer,
format=context.output_format,
quality=context.quality,
optimize=True
)
buffer.seek(0)
# Upload to S3
key = f"images/{context.user_id}/{width}x{height}.jpg"
s3_client.upload_fileobj(
buffer,
'your-cdn-bucket',
key,
ExtraArgs={'ContentType': 'image/jpeg', 'ACL': 'public-read'}
)
processed_url = f"https://cdn.example.com/{key}"
processed_images.append({
"size": f"{width}x{height}",
"url": processed_url,
"width": resized_image.width,
"height": resized_image.height
})
return {
"success": True,
"original": {
"url": context.image_url,
"width": original_image.width,
"height": original_image.height
},
"processed": processed_images,
"user_id": context.user_id,
"formats_generated": len(processed_images)
}
except requests.RequestException as e:
raise Exception(f"Failed to download image: {str(e)}")
except Image.UnidentifiedImageError:
raise Exception("Invalid or corrupted image file")
except Exception as e:
raise Exception(f"Image processing failed: {str(e)}")
@hy.task(max_retries=2)
def notify_processing_complete(user_id: int, processing_result: dict):
"""Notify user that image processing is complete"""
if processing_result["success"]:
# Send success notification
send_notification(
user_id,
"Image processing complete!",
f"Generated {processing_result['formats_generated']} optimized versions"
)
else:
# Send failure notification
send_notification(
user_id,
"Image processing failed",
"Please try uploading a different image"
)
return {"notification_sent": True, "user_id": user_id}
@hy.task
def batch_process_images(image_contexts: List[ImageProcessingContext]):
"""Process multiple images in parallel"""
task_ids = []
for context in image_contexts:
task = process_uploaded_image.send(context)
task_ids.append({
"task_id": task.task_id,
"user_id": context.user_id,
"image_url": context.image_url
})
return {
"batch_started": True,
"total_images": len(image_contexts),
"task_ids": task_ids
}