Build a comprehensive document processing pipeline that extracts text from various file formats including PDFs, Word documents, and images using OCR.

Overview

This example demonstrates how to process documents at scale:
  • Multi-format support - PDF, DOCX, and image files (JPG, PNG)
  • OCR processing - Extract text from images and scanned documents
  • Batch processing - Handle entire folders of documents
  • File upload handling - Process documents via REST API
  • Parallel extraction - Process multiple documents simultaneously
Perfect for building document management systems, content analysis pipelines, or digitization workflows.

Task Definitions

from hyrex import HyrexRegistry
import PyPDF2
import docx
from PIL import Image
import pytesseract
import os

hy = HyrexRegistry()

@hy.task
def extract_text_from_pdf(file_path: str) -> str:
    """Extract text from PDF files"""
    with open(file_path, 'rb') as file:
        reader = PyPDF2.PdfReader(file)
        text = ""
        for page in reader.pages:
            text += page.extract_text()
    return text

@hy.task
def extract_text_from_docx(file_path: str) -> str:
    """Extract text from Word documents"""
    doc = docx.Document(file_path)
    text = ""
    for paragraph in doc.paragraphs:
        text += paragraph.text + "\\n"
    return text

@hy.task
def extract_text_from_image(file_path: str) -> str:
    """Extract text from images using OCR"""
    image = Image.open(file_path)
    text = pytesseract.image_to_string(image)
    return text

@hy.task
def store_document_text(file_path: str, text: str):
    """Store extracted text in database"""
    # Your database storage logic here
    document_id = store_in_database(file_path, text)
    return {"document_id": document_id, "text_length": len(text)}

@hy.task
def process_document(file_path: str, file_type: str):
    """Process document based on file type"""
    if file_type == "pdf":
        text = extract_text_from_pdf.send(file_path).get()
    elif file_type == "docx":
        text = extract_text_from_docx.send(file_path).get()
    elif file_type in ["jpg", "png", "jpeg"]:
        text = extract_text_from_image.send(file_path).get()
    else:
        raise ValueError(f"Unsupported file type: {file_type}")
    
    # Store extracted text in database
    result = store_document_text.send(file_path, text).get()
    
    return {
        "file_path": file_path,
        "file_type": file_type,
        "extracted_text": text[:500] + "..." if len(text) > 500 else text,
        "text_length": len(text),
        "document_id": result["document_id"]
    }

@hy.task
def batch_process_documents(folder_path: str):
    """Process all documents in a folder"""
    processed_files = []
    supported_extensions = {'pdf', 'docx', 'jpg', 'png', 'jpeg'}
    
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        file_ext = filename.split('.')[-1].lower()
        
        if file_ext in supported_extensions:
            task = process_document.send(file_path, file_ext)
            processed_files.append({
                "filename": filename,
                "file_path": file_path,
                "task_id": task.task_id
            })
    
    return {
        "folder_path": folder_path,
        "processed_files": processed_files,
        "total_files": len(processed_files)
    }

REST API Endpoints

from fastapi import FastAPI, File, UploadFile
from pydantic import BaseModel
from typing import List
import os
from .tasks import process_document, batch_process_documents

app = FastAPI()

class ProcessingRequest(BaseModel):
    file_path: str
    file_type: str
    
class BatchRequest(BaseModel):
    folder_path: str
    file_types: List[str] = ["pdf", "docx", "jpg", "png"]

@app.post("/process/document")
async def process_single_document(request: ProcessingRequest):
    """Process a single document"""
    task = process_document.send(request.file_path, request.file_type)
    return {
        "message": "Document processing started", 
        "task_id": task.task_id,
        "file_path": request.file_path
    }

@app.post("/process/upload")
async def upload_and_process(file: UploadFile = File(...)):
    """Upload and process a document"""
    # Save uploaded file
    file_path = f"uploads/{file.filename}"
    os.makedirs("uploads", exist_ok=True)
    
    with open(file_path, "wb") as buffer:
        content = await file.read()
        buffer.write(content)
    
    # Determine file type and process
    file_type = file.filename.split('.')[-1].lower()
    task = process_document.send(file_path, file_type)
    
    return {
        "message": "File uploaded and processing started",
        "task_id": task.task_id,
        "filename": file.filename
    }

@app.post("/process/batch")
async def batch_process(request: BatchRequest):
    """Batch process documents in a folder"""
    task = batch_process_documents.send(request.folder_path)
    return {
        "message": "Batch processing started",
        "task_id": task.task_id,
        "folder_path": request.folder_path
    }

@app.get("/process/status/{task_id}")
async def get_processing_status(task_id: str):
    """Get document processing status"""
    task = hy.get_task(task_id)
    
    return {
        "task_id": task_id,
        "status": task.status,
        "result": task.result if task.is_complete else None,
        "created_at": task.created_at,
        "completed_at": task.completed_at
    }

Usage Examples

Process Single Document

# Process a specific file
curl -X POST http://localhost:8000/process/document \
  -H "Content-Type: application/json" \
  -d '{
    "file_path": "/documents/report.pdf",
    "file_type": "pdf"
  }'

# Upload and process
curl -X POST http://localhost:8000/process/upload \
  -F "file=@/path/to/document.pdf"

Batch Processing

# Process entire folder
curl -X POST http://localhost:8000/process/batch \
  -H "Content-Type: application/json" \
  -d '{
    "folder_path": "/documents/inbox"
  }'

# Check processing status
curl http://localhost:8000/process/status/task_12345

Advanced Processing Patterns

Document Classification

@hy.task
def classify_document(text: str, file_path: str):
    """Classify document type using extracted text"""
    # Simple keyword-based classification
    keywords = {
        'invoice': ['invoice', 'bill', 'amount due', 'payment'],
        'contract': ['agreement', 'terms', 'conditions', 'parties'],
        'report': ['analysis', 'summary', 'findings', 'conclusion']
    }
    
    text_lower = text.lower()
    scores = {}
    
    for doc_type, words in keywords.items():
        score = sum(1 for word in words if word in text_lower)
        scores[doc_type] = score
    
    document_type = max(scores, key=scores.get) if scores else 'unknown'
    
    return {
        "file_path": file_path,
        "document_type": document_type,
        "confidence_scores": scores
    }

Text Enhancement

@hy.task  
def enhance_extracted_text(text: str):
    """Clean and enhance extracted text"""
    import re
    
    # Remove excessive whitespace
    text = re.sub(r'\s+', ' ', text)
    
    # Fix common OCR errors
    replacements = {
        'l': 'I',  # Common OCR mistake
        '0': 'O',  # Zero vs letter O
        '5': 'S',  # Five vs letter S
    }
    
    # Apply replacements contextually
    enhanced_text = text
    for old, new in replacements.items():
        # Only replace if it makes sense in context
        enhanced_text = re.sub(rf'\b{old}\b', new, enhanced_text)
    
    return {
        "original_length": len(text),
        "enhanced_text": enhanced_text,
        "enhanced_length": len(enhanced_text)
    }

Dependencies Setup

Python Requirements

PyPDF2==3.0.1
python-docx==0.8.11
Pillow==10.0.0
pytesseract==0.3.10
fastapi==0.104.1
python-multipart==0.0.6

TypeScript Dependencies

{
  "dependencies": {
    "hyrex": "latest",
    "pdf-extract": "^1.0.7",
    "mammoth": "^1.6.0",
    "tesseract.js": "^4.1.4",
    "express": "^4.18.2",
    "multer": "^1.4.5-lts.1"
  },
  "devDependencies": {
    "@types/express": "^4.17.21",
    "@types/multer": "^1.4.11"
  }
}

Production Considerations

  • File storage: Use cloud storage (S3, GCS) for large-scale processing
  • OCR accuracy: Consider specialized OCR services for better accuracy
  • Memory management: Process large documents in chunks to avoid memory issues
  • Error handling: Implement fallback strategies for corrupted files
  • Security: Validate file types and scan for malware before processing
  • Performance: Use parallel processing for batch operations

Next Steps