Skip to main content

Overview

ExuluQueues manages background job queues using BullMQ and Redis. It enables asynchronous processing of long-running tasks like embeddings generation, document processing, scheduled data syncs, and agent workflows. Queues help keep your application responsive by offloading heavy work to background workers.

Key features

BullMQ integration

Built on BullMQ, a robust Redis-based queue system

Concurrency control

Configure worker and queue-level concurrency limits

Rate limiting

Control job processing rate to manage API limits

Retries & backoff

Automatic retry with exponential or linear backoff

Timeouts

Set maximum execution time for jobs

Telemetry

OpenTelemetry integration for monitoring

What is a queue?

A queue is a system for managing background jobs:
  1. Jobs are added to the queue with data and options
  2. Workers process jobs asynchronously in the background
  3. Results are tracked with success/failure status
  4. Failed jobs retry automatically based on configuration
Queues prevent blocking operations from slowing down your application and enable horizontal scaling by adding more workers.

Why use queues?

Tasks like embeddings generation, document processing, or data exports can take seconds or minutes. Queues let you return immediately to users while work continues in the background.
External APIs often have rate limits. Queues control processing speed to stay within limits while maximizing throughput.
Jobs persist in Redis. If a worker crashes, jobs aren’t lost and will retry when workers restart.
Add more worker processes to handle increased load without changing code.
Process jobs at specific times or intervals (e.g., daily data syncs, weekly reports).

Quick start

import { ExuluQueues } from "@exulu/backend";

// Register a queue
const embeddingsQueue = ExuluQueues.register(
  "embeddings",           // Queue name
  {
    worker: 5,            // 5 concurrent jobs per worker
    queue: 10             // 10 global concurrent jobs
  },
  10,                     // Rate limit: 10 jobs/second
  180                     // Timeout: 180 seconds
);

// Use the queue
const queueConfig = await embeddingsQueue.use();

// Add a job to the queue
await queueConfig.queue.add("generate-embeddings", {
  contextId: "docs",
  itemId: "item-123"
});

// Create a worker to process jobs
import { Worker } from "bullmq";

const worker = new Worker(
  "embeddings",
  async (job) => {
    console.log("Processing:", job.data);

    // Do the work
    await generateEmbeddings(job.data.contextId, job.data.itemId);

    return { success: true };
  },
  {
    connection: redisServer,
    concurrency: queueConfig.concurrency.worker,
    limiter: {
      max: queueConfig.ratelimit,
      duration: 1000
    }
  }
);

Architecture

BullMQ and Redis

ExuluQueues wraps BullMQ, which uses Redis for:
  • Job storage - Jobs persist in Redis
  • State management - Track job status (waiting, active, completed, failed)
  • Locks - Prevent duplicate processing
  • Pub/Sub - Notify workers of new jobs
import { Queue } from "bullmq";

// ExuluQueues creates BullMQ Queue instances
const queue = new Queue("my-queue", {
  connection: {
    host: "localhost",
    port: 6379
  }
});

Concurrency levels

ExuluQueues supports two concurrency levels:
Number of jobs a single worker process handles simultaneously:
concurrency: {
  worker: 5  // Each worker processes 5 jobs at once
}
Higher values = more throughput per worker, but more memory/CPU usage.
Example scenario:
  • Queue concurrency: 20
  • Worker concurrency: 5
  • 10 workers running
Each worker can handle 5 jobs, but only 20 jobs will run simultaneously across all workers.

Rate limiting

Control job processing speed:
ExuluQueues.register(
  "api-calls",
  { worker: 1, queue: 5 },
  10  // Process max 10 jobs per second
);
Useful for:
  • Staying within API rate limits
  • Preventing database overload
  • Controlling costs (API calls, LLM tokens)

Common use cases

const embeddingsQueue = ExuluQueues.register(
  "embeddings",
  { worker: 5, queue: 10 },
  10,  // 10 jobs/sec
  300  // 5 min timeout
);

// Used by ExuluContext
const context = new ExuluContext({
  id: "docs",
  embedder: new ExuluEmbedder({
    // ...
    queue: await embeddingsQueue.use()
  })
});

// Jobs are queued automatically
await context.createItem(item, config);

Queue lifecycle

1

Register queue

Define queue configuration with ExuluQueues.register()
const myQueue = ExuluQueues.register("my-queue", {...});
2

Initialize queue

Call .use() to create the BullMQ Queue instance
const config = await myQueue.use();
3

Add jobs

Queue jobs for processing
await config.queue.add("job-name", { data: "..." });
4

Create workers

Start worker processes to handle jobs
const worker = new Worker("my-queue", processor, options);
5

Process jobs

Workers pick up and execute jobs asynchronously

Integration with ExuluApp

ExuluApp automatically creates workers for registered queues:
const app = new ExuluApp();
await app.create({
  config: {
    workers: {
      enabled: true  // Creates workers for all queues
    }
  },
  contexts: {
    docs: docsContext  // Context with embedder queue
  },
  agents: {
    assistant: agent   // Agent with workflow queue
  }
});

// Workers are automatically created and started
When workers.enabled: true, ExuluApp:
  1. Discovers all queues from contexts, agents, and embedders
  2. Creates Worker instances for each queue
  3. Configures concurrency and rate limits
  4. Starts processing jobs

Job states

Jobs move through these states:
waiting → active → completed
              ↓
           failed → waiting (retry)
  • waiting: Job is queued, not yet picked up
  • active: Worker is currently processing
  • completed: Job finished successfully
  • failed: Job threw an error
Failed jobs automatically retry based on configuration.

Monitoring

Built-in telemetry

ExuluQueues includes OpenTelemetry integration:
import { BullMQOtel } from "bullmq-otel";

const queue = new Queue("my-queue", {
  telemetry: new BullMQOtel("exulu-app")
});
This tracks:
  • Job duration
  • Success/failure rates
  • Queue depth
  • Processing latency

Queue inspection

Check queue status programmatically:
const config = await myQueue.use();

// Get job counts
const counts = await config.queue.getJobCounts();
console.log(counts);
// { waiting: 5, active: 2, completed: 100, failed: 3 }

// Get specific jobs
const failedJobs = await config.queue.getFailed();
const activeJobs = await config.queue.getActive();

// Get queue metrics
const concurrency = await config.queue.getGlobalConcurrency();
console.log(`Global concurrency: ${concurrency}`);

Best practices

Right-size concurrency: Start with low concurrency and increase gradually. Monitor CPU/memory usage and adjust.
Configure timeouts: Set realistic timeouts based on expected job duration. Too short = premature failures, too long = stuck workers.
Redis is required: Queues require a Redis server. Ensure Redis is configured before using queues.
Rate limits: Set rate limits based on external service constraints (API limits, database capacity, LLM rate limits).

Redis configuration

ExuluQueues requires Redis connection info:
# Environment variables
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=optional-password
Or configure programmatically:
import { redisServer } from "@exulu/backend";

redisServer.host = "localhost";
redisServer.port = "6379";
redisServer.password = "optional-password";

Next steps