AI-Ready Datasets with Hyrex
So you want your AI agent to be aware of all of your content but how? Sounds like you need an index that stores your data in a way that can be quickly queried! But how do you parallelize all this data-processing and keep it up to date? Hyrex to the rescue!
With Hyrex, you can build robust data pipelines that automatically process documents, generate embeddings, and update your vector databases in real-time. Whether you're dealing with Google Drive documents, PDFs, or any other content source, Hyrex handles the heavy lifting of distributed processing while you focus on building great AI experiences.
Step 1: Define Your Data Processing Tasks
Start by creating Hyrex tasks that handle document processing, embedding generation, and database updates. These tasks run asynchronously on worker nodes and can be triggered by API calls or scheduled events.
1from hyrex import HyrexRegistry
2import psycopg2
3import openai
4
5hy = HyrexRegistry()
6
7@hy.task
8def update_db_record(doc_id: str, embedding: list[float]):
9    with psycopg2.connect(os.environ.get("DB_CONN_STRING")) as conn:
10        cursor = conn.cursor()
11        cursor.execute(
12            "UPDATE documents SET embedding = %s WHERE doc_id = %s",
13            (embedding, doc_id)
14        )
15
16@hy.task
17def process_document(doc_id: str):
18    file_content = gdrive_sdk.get_document(doc_id)
19
20    response = openai.embeddings.create(
21        model="text-embedding-3-small",
22        input=file_content
23    )
24    embedding = response.data[0].embedding
25
26    update_db_record.send(doc_id, embedding)
27
28@hy.task
29def sync_google_drive_documents():
30    all_doc_ids = gdrive_sdk.list_document_ids()
31    for doc_id in all_doc_ids:
32        process_document.send(doc_id)Step 2: Build APIs That Trigger Tasks
Create REST APIs that send tasks to your Hyrex workers. These publisher services act as entry points for external systems to trigger data processing workflows, returning task IDs for tracking progress.
1from fastapi import FastAPI
2from pydantic import BaseModel
3from .tasks import process_document, sync_google_drive_documents
4
5app = FastAPI()
6
7class DocumentRequest(BaseModel):
8    doc_id: str
9
10class BulkSyncRequest(BaseModel):
11    folder_id: str = None
12
13@app.post("/sync/document")
14async def sync_document(request: DocumentRequest):
15    # Send task to process a single document
16    task = process_document.send(request.doc_id)
17    return {"message": "Document sync started", "task_id": task.task_id}
18
19@app.post("/sync/all-documents")
20async def sync_all_documents(request: BulkSyncRequest = None):
21    # Send task to sync all Google Drive documents
22    task = sync_google_drive_documents.send()
23    return {"message": "Bulk sync started", "task_id": task.task_id}Congrats you did it!
Now you have a live index that automatically keeps your AI agent up to date with all your content! Your documents are being processed in parallel, embeddings are generated efficiently, and your vector database stays fresh with the latest information.
Want to make it even better? Consider setting up a cron job to periodically trigger the bulk sync endpoint, or add webhooks from your content sources to process updates in real-time as they happen.
Explore other use cases
Document Processing
Parse, transform, and analyze documents with parallel processing pipelines.
Agent Actions
Execute AI agent actions as durable, observable tasks.
Context Engineering
Orchestrate LLM context preparation with parallel processing.
Human-in-the-Loop
Integrate human approval steps with pause/resume workflows.