Documentation Index Fetch the complete documentation index at: https://hyrex.io/docs/llms.txt
Use this file to discover all available pages before exploring further.
Workflows in Hyrex allow you to orchestrate multiple tasks into complex directed acyclic graphs (DAGs). Tasks can run in sequence or parallel, with automatic dependency management.
Basic Workflow
Define workflows using the @hy.workflow decorator and the >> operator:
from hyrex import HyrexRegistry
hy = HyrexRegistry()
@hy.task
def extract ():
# Extract data from source
data = fetch_from_database()
return { "count" : len (data), "data" : data}
@hy.task
def transform ():
# Transform the data
processed = apply_transformations()
return { "processed_count" : len (processed)}
@hy.task
def load ():
# Load to destination
write_to_destination()
return { "status" : "completed" }
@hy.workflow
def etl_workflow ():
# Define task dependencies
extract >> transform >> load
Parallel Execution
Run multiple tasks in parallel using list notation:
@hy.task
def fetch_source_a ():
return { "source" : "a" , "data" : get_data_a()}
@hy.task
def fetch_source_b ():
return { "source" : "b" , "data" : get_data_b()}
@hy.task
def fetch_source_c ():
return { "source" : "c" , "data" : get_data_c()}
@hy.task
def combine_sources ():
# This runs after all parallel tasks complete
return { "combined" : True }
@hy.workflow
def parallel_workflow ():
# Run fetch tasks in parallel, then combine
[fetch_source_a, fetch_source_b, fetch_source_c] >> combine_sources
Triggering Workflows
Use the .send() method to trigger workflow execution:
# Simple workflow without arguments
@hy.workflow
def simple_workflow ():
extract >> transform >> load
# Trigger the workflow
workflow_run = simple_workflow.send()
For workflows with arguments, pass the context object to .send():
@hy.workflow ( workflow_arg_schema = ETLConfig)
def parameterized_workflow ():
extract >> transform >> load
# Trigger with typed arguments
workflow_run = parameterized_workflow.send(ETLConfig(
source_table = "events" ,
destination_table = "processed_events"
))
Workflow Arguments
Pass typed arguments to workflows using Pydantic models:
from pydantic import BaseModel
from typing import List
class ETLConfig ( BaseModel ):
source_table: str
destination_table: str
filters: List[ str ] = []
batch_size: int = 1000
@hy.task
def extract_with_config ():
# Access workflow args in tasks
ctx = get_hyrex_workflow_context()
if ctx and ctx.workflow_args:
config = ETLConfig( ** ctx.workflow_args)
print ( f "Extracting from { config.source_table } " )
# Use config.filters, config.batch_size, etc.
return { "extracted" : True }
@hy.workflow ( workflow_arg_schema = ETLConfig)
def parameterized_etl ():
extract_with_config >> transform >> load
# Send workflow with arguments
workflow_run = parameterized_etl.send(ETLConfig(
source_table = "raw_events" ,
destination_table = "processed_events" ,
filters = [ "status = 'active'" ],
batch_size = 5000
))
Access Workflow Context
Tasks within a workflow can access the workflow context and other task results:
from hyrex import get_hyrex_workflow_context
@hy.task
def validate_extraction ():
context = get_hyrex_workflow_context()
if context:
# Access workflow arguments
if context.workflow_args:
config = ETLConfig( ** context.workflow_args)
print ( f "Validating extraction for { config.source_table } " )
# Access other task results by task name
extract_run = context.durable_runs.get( "extract_with_config" )
if extract_run:
# Refresh to get latest status
extract_run.refresh()
# Check task runs
for run in extract_run.task_runs:
if run.status == "completed" :
print ( f "Extract result: { run.result } " )
return { "validation" : "passed" }
Task Configuration in Workflows
Override task configuration within workflows:
@hy.workflow (
queue = "etl-workflows" , # Default queue for all tasks
priority = 7
)
def configured_workflow ():
# Extract uses workflow defaults
extract >> \
# Transform needs more resources
transform.with_config(
queue = "cpu-intensive" ,
max_retries = 5 ,
timeout_seconds = 600
) >> \
# Load uses different queue
load.with_config(
queue = "io-heavy" ,
max_retries = 3
)
Complex Workflow Patterns
Build sophisticated DAGs with multiple parallel and sequential stages:
@hy.workflow
def complex_etl ():
# Stage 1: Extract from multiple sources in parallel
sources = [extract_postgres, extract_s3, extract_api]
# Stage 2: Validate each source (parallel)
validations = [validate_postgres, validate_s3, validate_api]
# Connect sources to their validators
extract_postgres >> validate_postgres
extract_s3 >> validate_s3
extract_api >> validate_api
# Stage 3: Merge all validated data
validations >> merge_data
# Stage 4: Transform and enrich in parallel
merge_data >> [transform_users, transform_events, enrich_metadata]
# Stage 5: Final load
[transform_users, transform_events, enrich_metadata] >> load_warehouse
Next Steps
Queues Route workflow tasks to specific workers
Monitoring Monitor workflow execution