Workflows in Hyrex allow you to orchestrate multiple tasks into complex directed acyclic graphs (DAGs). Tasks can run in sequence or parallel, with automatic dependency management.

Basic Workflow

Define workflows using the @hy.workflow decorator and the >> operator:
from hyrex import HyrexRegistry

hy = HyrexRegistry()

@hy.task
def extract():
    # Extract data from source
    data = fetch_from_database()
    return {"count": len(data), "data": data}

@hy.task
def transform():
    # Transform the data
    processed = apply_transformations()
    return {"processed_count": len(processed)}

@hy.task
def load():
    # Load to destination
    write_to_destination()
    return {"status": "completed"}

@hy.workflow
def etl_workflow():
    # Define task dependencies
    extract >> transform >> load

Parallel Execution

Run multiple tasks in parallel using list notation:
@hy.task
def fetch_source_a():
    return {"source": "a", "data": get_data_a()}

@hy.task 
def fetch_source_b():
    return {"source": "b", "data": get_data_b()}

@hy.task
def fetch_source_c():
    return {"source": "c", "data": get_data_c()}

@hy.task
def combine_sources():
    # This runs after all parallel tasks complete
    return {"combined": True}

@hy.workflow
def parallel_workflow():
    # Run fetch tasks in parallel, then combine
    [fetch_source_a, fetch_source_b, fetch_source_c] >> combine_sources

Workflow Arguments

Pass typed arguments to workflows using Pydantic models:
from pydantic import BaseModel
from typing import List

class ETLConfig(BaseModel):
    source_table: str
    destination_table: str
    filters: List[str] = []
    batch_size: int = 1000

@hy.task
def extract_with_config():
    # Access workflow args in tasks
    ctx = get_hyrex_workflow_context()
    if ctx and ctx.workflow_args:
        config = ETLConfig(**ctx.workflow_args)
        print(f"Extracting from {config.source_table}")
        # Use config.filters, config.batch_size, etc.
    return {"extracted": True}

@hy.workflow(workflow_arg_schema=ETLConfig)
def parameterized_etl():
    extract_with_config >> transform >> load

# Send workflow with arguments
workflow_run = parameterized_etl.send(ETLConfig(
    source_table="raw_events",
    destination_table="processed_events",
    filters=["status = 'active'"],
    batch_size=5000
))

Access Workflow Context

Tasks within a workflow can access the workflow context and other task results:
from hyrex import get_hyrex_workflow_context

@hy.task
def validate_extraction():
    context = get_hyrex_workflow_context()
    
    if context:
        # Access workflow arguments
        if context.workflow_args:
            config = ETLConfig(**context.workflow_args)
            print(f"Validating extraction for {config.source_table}")
        
        # Access other task results by task name
        extract_run = context.durable_runs.get("extract_with_config")
        if extract_run:
            # Refresh to get latest status
            extract_run.refresh()
            
            # Check task runs
            for run in extract_run.task_runs:
                if run.status == "completed":
                    print(f"Extract result: {run.result}")
    
    return {"validation": "passed"}

Task Configuration in Workflows

Override task configuration within workflows:
@hy.workflow(
    queue="etl-workflows",  # Default queue for all tasks
    priority=7
)
def configured_workflow():
    # Extract uses workflow defaults
    extract >> \
    # Transform needs more resources
    transform.with_config(
        queue="cpu-intensive",
        max_retries=5,
        timeout_seconds=600
    ) >> \
    # Load uses different queue
    load.with_config(
        queue="io-heavy",
        max_retries=3
    )

Complex Workflow Patterns

Build sophisticated DAGs with multiple parallel and sequential stages:
@hy.workflow
def complex_etl():
    # Stage 1: Extract from multiple sources in parallel
    sources = [extract_postgres, extract_s3, extract_api]
    
    # Stage 2: Validate each source (parallel)
    validations = [validate_postgres, validate_s3, validate_api]
    
    # Connect sources to their validators
    extract_postgres >> validate_postgres
    extract_s3 >> validate_s3  
    extract_api >> validate_api
    
    # Stage 3: Merge all validated data
    validations >> merge_data
    
    # Stage 4: Transform and enrich in parallel
    merge_data >> [transform_users, transform_events, enrich_metadata]
    
    # Stage 5: Final load
    [transform_users, transform_events, enrich_metadata] >> load_warehouse

Next Steps