Build DAG-based task workflows with TypeScript
hy.workflow()
method:
import { HyrexRegistry } from '@hyrex/hyrex';
const hy = new HyrexRegistry();
// Define tasks first
const extractTask = hy.task({
name: 'extract',
func: async () => {
return { data: 'extracted' };
}
});
const transformTask = hy.task({
name: 'transform',
func: async () => {
return { data: 'transformed' };
}
});
const loadTask = hy.task({
name: 'load',
func: async () => {
return { data: 'loaded' };
}
});
// Create workflow
const etlWorkflow = hy.workflow({
name: 'etlWorkflow',
config: {},
body: (workflowBuilder) => {
workflowBuilder
.start(extractTask)
.next(transformTask)
.next(loadTask);
return workflowBuilder;
}
});
const parallelWorkflow = hy.workflow({
name: 'parallelWorkflow',
config: { queue: 'processing' },
body: (workflowBuilder) => {
// Run taskB, taskC, and taskD in parallel after taskA
workflowBuilder
.start(taskA)
.next([taskB, taskC, taskD])
.next(taskE); // taskE runs after all parallel tasks complete
return workflowBuilder;
}
});
const onboardUser = hy.workflow({
name: 'onboardUser',
config: { queue: 'onboard-user' },
body: (workflowBuilder) => {
// Main path
workflowBuilder
.start(initiateOnboard)
.next([validatePayment, validateIdentity, validateOrg])
.next(approveUser);
// Branch from validateIdentity
validateIdentity
.next(checkCredit)
.next(trainCreditMachineLearningModel);
return workflowBuilder;
}
});
import { z } from 'zod';
const WorkflowArgs = z.object({
userEmail: z.string().email(),
signUpTier: z.enum(['FREE', 'PRO', 'ENTERPRISE'])
});
const onboardingWorkflow = hy.workflow({
name: 'onboardingWorkflow',
config: { queue: 'onboarding' },
workflowArgSchema: WorkflowArgs,
body: (workflowBuilder) => {
workflowBuilder
.start(createUserTask)
.next(sendWelcomeEmailTask)
.next(setupDefaultsTask);
return workflowBuilder;
}
});
// Send workflow with arguments
await onboardingWorkflow.send({
userEmail: 'user@example.com',
signUpTier: 'PRO'
});
import { getHyrexContext, HyrexKV } from '@hyrex/hyrex';
const createPDFTask = hy.task({
name: 'createPDF',
func: async () => {
const ctx = getHyrexContext();
console.log(`Workflow Run ID: ${ctx.workflowRunId}`);
// Store data for other workflow tasks
const pdfPath = 'userReport.pdf';
await HyrexKV.set(`workflow-${ctx.workflowRunId}-pdf`, pdfPath);
return { created: true };
}
});
const sendPDFTask = hy.task({
name: 'sendPDF',
func: async () => {
const ctx = getHyrexContext();
// Retrieve data from previous task
const pdfPath = await HyrexKV.get(`workflow-${ctx.workflowRunId}-pdf`);
console.log(`Sending PDF: ${pdfPath}`);
return { sent: true };
}
});
const pdfWorkflow = hy.workflow({
name: 'pdfWorkflow',
config: {},
body: (workflowBuilder) => {
workflowBuilder
.start(createPDFTask)
.next(sendPDFTask);
return workflowBuilder;
}
});
const configuredWorkflow = hy.workflow({
name: 'configuredWorkflow',
config: { queue: 'default' },
body: (workflowBuilder) => {
workflowBuilder
.start(extractTask)
.next(transformTask.withConfig({
queue: 'cpu-intensive',
maxRetries: 5,
timeoutSeconds: 600
}))
.next(loadTask);
return workflowBuilder;
}
});
.copy()
:
const multiStageWorkflow = hy.workflow({
name: 'multiStageWorkflow',
config: {},
body: (workflowBuilder) => {
workflowBuilder
.start(validateTask)
.next(processTask)
.next(validateTask.copy()) // Run validation again
.next(finalizeTask);
return workflowBuilder;
}
});
const workflow = hy.workflow({
name: 'myWorkflow',
config: {
queue: 'workflows', // Queue for all workflow tasks
maxRetries: 2, // Default retries for tasks
timeoutSeconds: 1800, // 30 minute timeout
priority: 7, // High priority
idempotencyKey: 'unique-123' // Prevent duplicate runs
},
body: (workflowBuilder) => {
// Workflow definition
return workflowBuilder;
}
});
// Send workflow without arguments
await simpleWorkflow.send();
// Send workflow with arguments (validated by schema)
await onboardingWorkflow.send({
userEmail: 'user@example.com',
signUpTier: 'PRO'
});
// Good: Clear linear and parallel paths
workflowBuilder
.start(validateInput)
.next([fetchUserData, fetchProductData]) // Parallel
.next(processOrder) // Sequential
.next([sendEmail, updateInventory]); // Parallel
const WorkflowArgs = z.object({
orderId: z.string().uuid(),
priority: z.enum(['normal', 'express']),
items: z.array(z.object({
sku: z.string(),
quantity: z.number().positive()
}))
});
// Use HyrexKV with workflow-specific keys
const key = `workflow-${ctx.workflowRunId}-data`;
await HyrexKV.set(key, JSON.stringify(data));
// Individual tasks handle their own errors
const robustTask = hy.task({
name: 'robustTask',
config: { maxRetries: 3 },
func: async () => {
try {
return await riskyOperation();
} catch (error) {
// Log error but allow workflow to continue
console.error('Task failed:', error);
return { status: 'failed', error: error.message };
}
}
});