from hyrex import HyrexRegistry
import PyPDF2
import docx
from PIL import Image
import pytesseract
import os
hy = HyrexRegistry()
@hy.task
def extract_text_from_pdf(file_path: str) -> str:
"""Extract text from PDF files"""
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text()
return text
@hy.task
def extract_text_from_docx(file_path: str) -> str:
"""Extract text from Word documents"""
doc = docx.Document(file_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\\n"
return text
@hy.task
def extract_text_from_image(file_path: str) -> str:
"""Extract text from images using OCR"""
image = Image.open(file_path)
text = pytesseract.image_to_string(image)
return text
@hy.task
def store_document_text(file_path: str, text: str):
"""Store extracted text in database"""
# Your database storage logic here
document_id = store_in_database(file_path, text)
return {"document_id": document_id, "text_length": len(text)}
@hy.task
def process_document(file_path: str, file_type: str):
"""Process document based on file type"""
if file_type == "pdf":
text = extract_text_from_pdf.send(file_path).get()
elif file_type == "docx":
text = extract_text_from_docx.send(file_path).get()
elif file_type in ["jpg", "png", "jpeg"]:
text = extract_text_from_image.send(file_path).get()
else:
raise ValueError(f"Unsupported file type: {file_type}")
# Store extracted text in database
result = store_document_text.send(file_path, text).get()
return {
"file_path": file_path,
"file_type": file_type,
"extracted_text": text[:500] + "..." if len(text) > 500 else text,
"text_length": len(text),
"document_id": result["document_id"]
}
@hy.task
def batch_process_documents(folder_path: str):
"""Process all documents in a folder"""
processed_files = []
supported_extensions = {'pdf', 'docx', 'jpg', 'png', 'jpeg'}
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
file_ext = filename.split('.')[-1].lower()
if file_ext in supported_extensions:
task = process_document.send(file_path, file_ext)
processed_files.append({
"filename": filename,
"file_path": file_path,
"task_id": task.task_id
})
return {
"folder_path": folder_path,
"processed_files": processed_files,
"total_files": len(processed_files)
}