Queues in Hyrex allow you to route tasks to specific worker pools, control concurrency, and organize your workload. Workers can process tasks from specific queues using glob patterns.

Default Queue

All tasks use the "default" queue unless specified:
from hyrex import HyrexRegistry

hy = HyrexRegistry()

@hy.task
def simple_task(data: dict):
    # Automatically goes to "default" queue
    return {"processed": True}

Named Queues

Route tasks to specific queues using the queue parameter:
@hy.task(queue="email-processing")
def send_email(to: str, subject: str):
    # This task only runs on workers processing "email-processing" queue
    return {"sent": True}

@hy.task(queue="data-processing")
def process_data(file_path: str):
    # Runs on workers processing "data-processing" queue
    return {"processed": True}

Concurrency Control

Use HyrexQueue to limit concurrent task execution:
from hyrex import HyrexQueue

# Limit to 1 concurrent task
@hy.task(queue=HyrexQueue(name="api-calls", concurrency_limit=1))
def call_external_api(endpoint: str):
    # Only one instance runs at a time
    response = requests.get(endpoint)
    return response.json()

# Limit database operations
@hy.task(queue=HyrexQueue(name="db-writes", concurrency_limit=5))
def write_to_database(data: dict):
    # Maximum 5 concurrent database writes
    save_to_db(data)
    return {"saved": True}

Dynamic Queue Assignment

Override queue assignment at runtime:
@hy.task(queue="normal")
def flexible_task(data: dict):
    return process(data)

# Send to different queues based on conditions
if urgent:
    flexible_task.with_config(queue="high-priority").send(data)
elif large_dataset:
    flexible_task.with_config(queue="heavy-processing").send(data)
else:
    flexible_task.send(data)  # Uses default "normal" queue

Queue Naming Patterns

Organize queues by purpose, priority, or resource requirements:
# By task type
@hy.task(queue="email-notifications")
def send_notification(email: str): pass

@hy.task(queue="report-generation")
def generate_report(params: dict): pass

# By priority
@hy.task(queue="critical")
def process_payment(payment: dict): pass

@hy.task(queue="background")
def cleanup_old_files(): pass

# By resource needs
@hy.task(queue="cpu-intensive")
def analyze_data(dataset: str): pass

@hy.task(queue="io-heavy")
def bulk_file_transfer(files: list): pass

@hy.task(queue="gpu-required")
def train_model(config: dict): pass

Worker Configuration

Configure workers to process specific queues using glob patterns:
# Process all queues (default pattern is "*")
hyrex run-worker hyrex_app:app

# Process queues matching pattern
hyrex run-worker hyrex_app:app --queue-pattern "email*"

# Process specific queue only
hyrex run-worker hyrex_app:app --queue-pattern "critical"

# Glob patterns (supports *, ?, [], {})
hyrex run-worker hyrex_app:app --queue-pattern "email-*"
hyrex run-worker hyrex_app:app --queue-pattern "task-[0-9]"
hyrex run-worker hyrex_app:app --queue-pattern "task-?"

Multiple Worker Pools

Run different worker pools for different queue patterns:
# Terminal 1: Critical tasks only
hyrex run-worker hyrex_app:app --queue-pattern "critical" --num-processes 4

# Terminal 2: Email processing
hyrex run-worker hyrex_app:app --queue-pattern "email-*" --num-processes 2

# Terminal 3: Background processing
hyrex run-worker hyrex_app:app --queue-pattern "background-*" --num-processes 8

Best Practices

  1. Use descriptive queue names that indicate purpose or resource requirements
  2. Set concurrency limits for rate-limited resources (APIs, databases)
  3. Separate CPU-intensive and I/O-bound tasks into different queues
  4. Scale workers independently based on queue workload
  5. Monitor queue depths to identify bottlenecks

Next Steps