Workflows allow you to orchestrate multiple tasks in a directed acyclic graph (DAG), defining dependencies and parallel execution paths.

Basic Workflow

Define a workflow using the hy.workflow() method:
import { HyrexRegistry } from '@hyrex/hyrex';

const hy = new HyrexRegistry();

// Define tasks first
const extractTask = hy.task({
    name: 'extract',
    func: async () => {
        return { data: 'extracted' };
    }
});

const transformTask = hy.task({
    name: 'transform',
    func: async () => {
        return { data: 'transformed' };
    }
});

const loadTask = hy.task({
    name: 'load',
    func: async () => {
        return { data: 'loaded' };
    }
});

// Create workflow
const etlWorkflow = hy.workflow({
    name: 'etlWorkflow',
    config: {},
    body: (workflowBuilder) => {
        workflowBuilder
            .start(extractTask)
            .next(transformTask)
            .next(loadTask);
        
        return workflowBuilder;
    }
});

Parallel Execution

Run multiple tasks in parallel using arrays:
const parallelWorkflow = hy.workflow({
    name: 'parallelWorkflow',
    config: { queue: 'processing' },
    body: (workflowBuilder) => {
        // Run taskB, taskC, and taskD in parallel after taskA
        workflowBuilder
            .start(taskA)
            .next([taskB, taskC, taskD])
            .next(taskE);  // taskE runs after all parallel tasks complete
        
        return workflowBuilder;
    }
});

Complex DAG Patterns

Build complex workflows with branching paths:
const onboardUser = hy.workflow({
    name: 'onboardUser',
    config: { queue: 'onboard-user' },
    body: (workflowBuilder) => {
        // Main path
        workflowBuilder
            .start(initiateOnboard)
            .next([validatePayment, validateIdentity, validateOrg])
            .next(approveUser);
        
        // Branch from validateIdentity
        validateIdentity
            .next(checkCredit)
            .next(trainCreditMachineLearningModel);
        
        return workflowBuilder;
    }
});

Workflow Arguments

Pass arguments to workflows using Zod schemas:
import { z } from 'zod';

const WorkflowArgs = z.object({
    userEmail: z.string().email(),
    signUpTier: z.enum(['FREE', 'PRO', 'ENTERPRISE'])
});

const onboardingWorkflow = hy.workflow({
    name: 'onboardingWorkflow',
    config: { queue: 'onboarding' },
    workflowArgSchema: WorkflowArgs,
    body: (workflowBuilder) => {
        workflowBuilder
            .start(createUserTask)
            .next(sendWelcomeEmailTask)
            .next(setupDefaultsTask);
        
        return workflowBuilder;
    }
});

// Send workflow with arguments
await onboardingWorkflow.send({
    userEmail: 'user@example.com',
    signUpTier: 'PRO'
});

Access Workflow Context

Tasks within workflows can access workflow metadata:
import { getHyrexContext, HyrexKV } from '@hyrex/hyrex';

const createPDFTask = hy.task({
    name: 'createPDF',
    func: async () => {
        const ctx = getHyrexContext();
        console.log(`Workflow Run ID: ${ctx.workflowRunId}`);
        
        // Store data for other workflow tasks
        const pdfPath = 'userReport.pdf';
        await HyrexKV.set(`workflow-${ctx.workflowRunId}-pdf`, pdfPath);
        
        return { created: true };
    }
});

const sendPDFTask = hy.task({
    name: 'sendPDF',
    func: async () => {
        const ctx = getHyrexContext();
        
        // Retrieve data from previous task
        const pdfPath = await HyrexKV.get(`workflow-${ctx.workflowRunId}-pdf`);
        console.log(`Sending PDF: ${pdfPath}`);
        
        return { sent: true };
    }
});

const pdfWorkflow = hy.workflow({
    name: 'pdfWorkflow',
    config: {},
    body: (workflowBuilder) => {
        workflowBuilder
            .start(createPDFTask)
            .next(sendPDFTask);
        
        return workflowBuilder;
    }
});

Task Configuration in Workflows

Apply task-specific configuration within workflows:
const configuredWorkflow = hy.workflow({
    name: 'configuredWorkflow',
    config: { queue: 'default' },
    body: (workflowBuilder) => {
        workflowBuilder
            .start(extractTask)
            .next(transformTask.withConfig({
                queue: 'cpu-intensive',
                maxRetries: 5,
                timeoutSeconds: 600
            }))
            .next(loadTask);
        
        return workflowBuilder;
    }
});

Task Copies

Use the same task multiple times in a workflow with .copy():
const multiStageWorkflow = hy.workflow({
    name: 'multiStageWorkflow',
    config: {},
    body: (workflowBuilder) => {
        workflowBuilder
            .start(validateTask)
            .next(processTask)
            .next(validateTask.copy())  // Run validation again
            .next(finalizeTask);
        
        return workflowBuilder;
    }
});

Workflow Configuration

Configure workflow behavior:
const workflow = hy.workflow({
    name: 'myWorkflow',
    config: {
        queue: 'workflows',           // Queue for all workflow tasks
        maxRetries: 2,               // Default retries for tasks
        timeoutSeconds: 1800,        // 30 minute timeout
        priority: 7,                 // High priority
        idempotencyKey: 'unique-123' // Prevent duplicate runs
    },
    body: (workflowBuilder) => {
        // Workflow definition
        return workflowBuilder;
    }
});

Sending Workflows

Send workflows for execution:
// Send workflow without arguments
await simpleWorkflow.send();

// Send workflow with arguments (validated by schema)
await onboardingWorkflow.send({
    userEmail: 'user@example.com',
    signUpTier: 'PRO'
});

Best Practices

  1. Define Clear Dependencies
    // Good: Clear linear and parallel paths
    workflowBuilder
        .start(validateInput)
        .next([fetchUserData, fetchProductData])  // Parallel
        .next(processOrder)                        // Sequential
        .next([sendEmail, updateInventory]);       // Parallel
    
  2. Use Type-Safe Arguments
    const WorkflowArgs = z.object({
        orderId: z.string().uuid(),
        priority: z.enum(['normal', 'express']),
        items: z.array(z.object({
            sku: z.string(),
            quantity: z.number().positive()
        }))
    });
    
  3. Share Data Between Tasks
    // Use HyrexKV with workflow-specific keys
    const key = `workflow-${ctx.workflowRunId}-data`;
    await HyrexKV.set(key, JSON.stringify(data));
    
  4. Handle Workflow Errors
    // Individual tasks handle their own errors
    const robustTask = hy.task({
        name: 'robustTask',
        config: { maxRetries: 3 },
        func: async () => {
            try {
                return await riskyOperation();
            } catch (error) {
                // Log error but allow workflow to continue
                console.error('Task failed:', error);
                return { status: 'failed', error: error.message };
            }
        }
    });
    

Next Steps