Documentation Index Fetch the complete documentation index at: https://hyrex.io/docs/llms.txt
Use this file to discover all available pages before exploring further.
Build a comprehensive document processing pipeline that extracts text from various file formats including PDFs, Word documents, and images using OCR.
Overview
This example demonstrates how to process documents at scale:
Multi-format support - PDF, DOCX, and image files (JPG, PNG)
OCR processing - Extract text from images and scanned documents
Batch processing - Handle entire folders of documents
File upload handling - Process documents via REST API
Parallel extraction - Process multiple documents simultaneously
Perfect for building document management systems, content analysis pipelines, or digitization workflows.
Task Definitions
from hyrex import HyrexRegistry
import PyPDF2
import docx
from PIL import Image
import pytesseract
import os
hy = HyrexRegistry()
@hy.task
def extract_text_from_pdf ( file_path : str ) -> str :
"""Extract text from PDF files"""
with open (file_path, 'rb' ) as file :
reader = PyPDF2.PdfReader( file )
text = ""
for page in reader.pages:
text += page.extract_text()
return text
@hy.task
def extract_text_from_docx ( file_path : str ) -> str :
"""Extract text from Word documents"""
doc = docx.Document(file_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + " \\ n"
return text
@hy.task
def extract_text_from_image ( file_path : str ) -> str :
"""Extract text from images using OCR"""
image = Image.open(file_path)
text = pytesseract.image_to_string(image)
return text
@hy.task
def store_document_text ( file_path : str , text : str ):
"""Store extracted text in database"""
# Your database storage logic here
document_id = store_in_database(file_path, text)
return { "document_id" : document_id, "text_length" : len (text)}
@hy.task
def process_document ( file_path : str , file_type : str ):
"""Process document based on file type"""
if file_type == "pdf" :
text = extract_text_from_pdf.send(file_path).get()
elif file_type == "docx" :
text = extract_text_from_docx.send(file_path).get()
elif file_type in [ "jpg" , "png" , "jpeg" ]:
text = extract_text_from_image.send(file_path).get()
else :
raise ValueError ( f "Unsupported file type: { file_type } " )
# Store extracted text in database
result = store_document_text.send(file_path, text).get()
return {
"file_path" : file_path,
"file_type" : file_type,
"extracted_text" : text[: 500 ] + "..." if len (text) > 500 else text,
"text_length" : len (text),
"document_id" : result[ "document_id" ]
}
@hy.task
def batch_process_documents ( folder_path : str ):
"""Process all documents in a folder"""
processed_files = []
supported_extensions = { 'pdf' , 'docx' , 'jpg' , 'png' , 'jpeg' }
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
file_ext = filename.split( '.' )[ - 1 ].lower()
if file_ext in supported_extensions:
task = process_document.send(file_path, file_ext)
processed_files.append({
"filename" : filename,
"file_path" : file_path,
"task_id" : task.task_id
})
return {
"folder_path" : folder_path,
"processed_files" : processed_files,
"total_files" : len (processed_files)
}
REST API Endpoints
Python (FastAPI)
TypeScript (Express)
from fastapi import FastAPI, File, UploadFile
from pydantic import BaseModel
from typing import List
import os
from .tasks import process_document, batch_process_documents
app = FastAPI()
class ProcessingRequest ( BaseModel ):
file_path: str
file_type: str
class BatchRequest ( BaseModel ):
folder_path: str
file_types: List[ str ] = [ "pdf" , "docx" , "jpg" , "png" ]
@app.post ( "/process/document" )
async def process_single_document ( request : ProcessingRequest):
"""Process a single document"""
task = process_document.send(request.file_path, request.file_type)
return {
"message" : "Document processing started" ,
"task_id" : task.task_id,
"file_path" : request.file_path
}
@app.post ( "/process/upload" )
async def upload_and_process ( file : UploadFile = File( ... )):
"""Upload and process a document"""
# Save uploaded file
file_path = f "uploads/ { file .filename } "
os.makedirs( "uploads" , exist_ok = True )
with open (file_path, "wb" ) as buffer:
content = await file .read()
buffer.write(content)
# Determine file type and process
file_type = file .filename.split( '.' )[ - 1 ].lower()
task = process_document.send(file_path, file_type)
return {
"message" : "File uploaded and processing started" ,
"task_id" : task.task_id,
"filename" : file .filename
}
@app.post ( "/process/batch" )
async def batch_process ( request : BatchRequest):
"""Batch process documents in a folder"""
task = batch_process_documents.send(request.folder_path)
return {
"message" : "Batch processing started" ,
"task_id" : task.task_id,
"folder_path" : request.folder_path
}
@app.get ( "/process/status/ {task_id} " )
async def get_processing_status ( task_id : str ):
"""Get document processing status"""
task = hy.get_task(task_id)
return {
"task_id" : task_id,
"status" : task.status,
"result" : task.result if task.is_complete else None ,
"created_at" : task.created_at,
"completed_at" : task.completed_at
}
Usage Examples
Process Single Document
# Process a specific file
curl -X POST http://localhost:8000/process/document \
-H "Content-Type: application/json" \
-d '{
"file_path": "/documents/report.pdf",
"file_type": "pdf"
}'
# Upload and process
curl -X POST http://localhost:8000/process/upload \
-F "file=@/path/to/document.pdf"
Batch Processing
# Process entire folder
curl -X POST http://localhost:8000/process/batch \
-H "Content-Type: application/json" \
-d '{
"folder_path": "/documents/inbox"
}'
# Check processing status
curl http://localhost:8000/process/status/task_12345
Advanced Processing Patterns
Document Classification
@hy.task
def classify_document ( text : str , file_path : str ):
"""Classify document type using extracted text"""
# Simple keyword-based classification
keywords = {
'invoice' : [ 'invoice' , 'bill' , 'amount due' , 'payment' ],
'contract' : [ 'agreement' , 'terms' , 'conditions' , 'parties' ],
'report' : [ 'analysis' , 'summary' , 'findings' , 'conclusion' ]
}
text_lower = text.lower()
scores = {}
for doc_type, words in keywords.items():
score = sum ( 1 for word in words if word in text_lower)
scores[doc_type] = score
document_type = max (scores, key = scores.get) if scores else 'unknown'
return {
"file_path" : file_path,
"document_type" : document_type,
"confidence_scores" : scores
}
Text Enhancement
@hy.task
def enhance_extracted_text ( text : str ):
"""Clean and enhance extracted text"""
import re
# Remove excessive whitespace
text = re.sub( r ' \s + ' , ' ' , text)
# Fix common OCR errors
replacements = {
'l' : 'I' , # Common OCR mistake
'0' : 'O' , # Zero vs letter O
'5' : 'S' , # Five vs letter S
}
# Apply replacements contextually
enhanced_text = text
for old, new in replacements.items():
# Only replace if it makes sense in context
enhanced_text = re.sub( rf '\b { old } \b' , new, enhanced_text)
return {
"original_length" : len (text),
"enhanced_text" : enhanced_text,
"enhanced_length" : len (enhanced_text)
}
Dependencies Setup
Python Requirements
PyPDF2==3.0.1
python-docx==0.8.11
Pillow==10.0.0
pytesseract==0.3.10
fastapi==0.104.1
python-multipart==0.0.6
TypeScript Dependencies
{
"dependencies" : {
"hyrex" : "latest" ,
"pdf-extract" : "^1.0.7" ,
"mammoth" : "^1.6.0" ,
"tesseract.js" : "^4.1.4" ,
"express" : "^4.18.2" ,
"multer" : "^1.4.5-lts.1"
},
"devDependencies" : {
"@types/express" : "^4.17.21" ,
"@types/multer" : "^1.4.11"
}
}
Production Considerations
File storage : Use cloud storage (S3, GCS) for large-scale processing
OCR accuracy : Consider specialized OCR services for better accuracy
Memory management : Process large documents in chunks to avoid memory issues
Error handling : Implement fallback strategies for corrupted files
Security : Validate file types and scan for malware before processing
Performance : Use parallel processing for batch operations
Next Steps
Batch Processing Handle large datasets efficiently
Document Embeddings Make documents searchable with AI