Create searchable document embeddings from Google Drive files using OpenAI’s embedding API and PostgreSQL vector storage.

Overview

This example shows how to build an AI-ready document processing pipeline that:
  • Syncs documents from Google Drive
  • Generates embeddings using OpenAI’s API
  • Stores vectors in PostgreSQL for similarity search
  • Provides REST endpoints to trigger processing
Perfect for building RAG (Retrieval Augmented Generation) systems, semantic search, or document similarity matching.

Task Definitions

from hyrex import HyrexRegistry
import psycopg2
import openai
import os

hy = HyrexRegistry()

@hy.task
def update_db_record(doc_id: str, embedding: list[float]):
    """Store document embedding in PostgreSQL"""
    with psycopg2.connect(os.environ.get("DB_CONN_STRING")) as conn:
        cursor = conn.cursor()
        cursor.execute(
            "UPDATE documents SET embedding = %s WHERE doc_id = %s",
            (embedding, doc_id)
        )

@hy.task
def process_document(doc_id: str):
    """Generate embedding for a single document"""
    # Fetch document content from Google Drive
    file_content = gdrive_sdk.get_document(doc_id)
    
    # Generate embedding using OpenAI
    response = openai.embeddings.create(
        model="text-embedding-3-small",
        input=file_content
    )
    embedding = response.data[0].embedding
    
    # Store embedding in database
    update_db_record.send(doc_id, embedding)

@hy.task
def sync_google_drive_documents():
    """Process all documents in Google Drive"""
    all_doc_ids = gdrive_sdk.list_document_ids()
    for doc_id in all_doc_ids:
        process_document.send(doc_id)

REST API Publisher

Create endpoints to trigger document processing on-demand:
from fastapi import FastAPI
from pydantic import BaseModel
from .tasks import process_document, sync_google_drive_documents

app = FastAPI()

class DocumentRequest(BaseModel):
    doc_id: str
    
class BulkSyncRequest(BaseModel):
    folder_id: str = None

@app.post("/sync/document")
async def sync_document(request: DocumentRequest):
    """Process a single document"""
    task = process_document.send(request.doc_id)
    return {"message": "Document sync started", "task_id": task.task_id}

@app.post("/sync/all-documents") 
async def sync_all_documents(request: BulkSyncRequest = None):
    """Process all Google Drive documents"""
    task = sync_google_drive_documents.send()
    return {"message": "Bulk sync started", "task_id": task.task_id}

Database Schema

Set up PostgreSQL with vector extension for similarity search:
-- Enable vector extension
CREATE EXTENSION IF NOT EXISTS vector;

-- Create documents table
CREATE TABLE documents (
    doc_id VARCHAR(255) PRIMARY KEY,
    title TEXT,
    content TEXT,
    embedding vector(1536), -- OpenAI text-embedding-3-small dimension
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- Create index for similarity search
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops) 
WITH (lists = 100);

Usage Examples

# Process a single document
curl -X POST http://localhost:8000/sync/document \
  -H "Content-Type: application/json" \
  -d '{"doc_id": "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms"}'

# Process all documents  
curl -X POST http://localhost:8000/sync/all-documents \
  -H "Content-Type: application/json" \
  -d '{}'
Once embeddings are generated, perform semantic search:
-- Find documents similar to a query
SELECT doc_id, title, 1 - (embedding <=> query_embedding) as similarity
FROM documents 
ORDER BY embedding <=> query_embedding
LIMIT 10;

Production Considerations

  • Rate limiting: OpenAI has API rate limits - use appropriate retry policies
  • Chunking: For large documents, split into chunks before embedding
  • Caching: Cache embeddings to avoid reprocessing unchanged documents
  • Monitoring: Track embedding generation costs and processing times
  • Security: Store API keys securely and validate document access permissions

Next Steps