Overview
ExuluQueues manages background job queues using BullMQ and Redis. It enables asynchronous processing of long-running tasks like embeddings generation, document processing, scheduled data syncs, and agent workflows. Queues help keep your application responsive by offloading heavy work to background workers.
Key features
BullMQ integration Built on BullMQ, a robust Redis-based queue system
Concurrency control Configure worker and queue-level concurrency limits
Rate limiting Control job processing rate to manage API limits
Retries & backoff Automatic retry with exponential or linear backoff
Timeouts Set maximum execution time for jobs
Telemetry OpenTelemetry integration for monitoring
What is a queue?
A queue is a system for managing background jobs:
Jobs are added to the queue with data and options
Workers process jobs asynchronously in the background
Results are tracked with success/failure status
Failed jobs retry automatically based on configuration
Queues prevent blocking operations from slowing down your application and enable horizontal scaling by adding more workers.
Why use queues?
Tasks like embeddings generation, document processing, or data exports can take seconds or minutes. Queues let you return immediately to users while work continues in the background.
External APIs often have rate limits. Queues control processing speed to stay within limits while maximizing throughput.
Jobs persist in Redis. If a worker crashes, jobs aren’t lost and will retry when workers restart.
Add more worker processes to handle increased load without changing code.
Process jobs at specific times or intervals (e.g., daily data syncs, weekly reports).
Quick start
import { ExuluQueues } from "@exulu/backend" ;
// Register a queue
const embeddingsQueue = ExuluQueues . register (
"embeddings" , // Queue name
{
worker: 5 , // 5 concurrent jobs per worker
queue: 10 // 10 global concurrent jobs
},
10 , // Rate limit: 10 jobs/second
180 // Timeout: 180 seconds
);
// Use the queue
const queueConfig = await embeddingsQueue . use ();
// Add a job to the queue
await queueConfig . queue . add ( "generate-embeddings" , {
contextId: "docs" ,
itemId: "item-123"
});
// Create a worker to process jobs
import { Worker } from "bullmq" ;
const worker = new Worker (
"embeddings" ,
async ( job ) => {
console . log ( "Processing:" , job . data );
// Do the work
await generateEmbeddings ( job . data . contextId , job . data . itemId );
return { success: true };
},
{
connection: redisServer ,
concurrency: queueConfig . concurrency . worker ,
limiter: {
max: queueConfig . ratelimit ,
duration: 1000
}
}
);
Architecture
BullMQ and Redis
ExuluQueues wraps BullMQ, which uses Redis for:
Job storage - Jobs persist in Redis
State management - Track job status (waiting, active, completed, failed)
Locks - Prevent duplicate processing
Pub/Sub - Notify workers of new jobs
import { Queue } from "bullmq" ;
// ExuluQueues creates BullMQ Queue instances
const queue = new Queue ( "my-queue" , {
connection: {
host: "localhost" ,
port: 6379
}
});
Concurrency levels
ExuluQueues supports two concurrency levels:
Worker concurrency
Queue concurrency
Number of jobs a single worker process handles simultaneously: concurrency : {
worker : 5 // Each worker processes 5 jobs at once
}
Higher values = more throughput per worker, but more memory/CPU usage. Maximum jobs processing across all workers globally: concurrency : {
queue : 10 // Max 10 jobs processing across all workers
}
Prevents overwhelming external services or databases.
Example scenario:
Queue concurrency: 20
Worker concurrency: 5
10 workers running
Each worker can handle 5 jobs, but only 20 jobs will run simultaneously across all workers.
Rate limiting
Control job processing speed:
ExuluQueues . register (
"api-calls" ,
{ worker: 1 , queue: 5 },
10 // Process max 10 jobs per second
);
Useful for:
Staying within API rate limits
Preventing database overload
Controlling costs (API calls, LLM tokens)
Common use cases
Embeddings generation
Document processing
Scheduled data sync
Agent workflows
const embeddingsQueue = ExuluQueues . register (
"embeddings" ,
{ worker: 5 , queue: 10 },
10 , // 10 jobs/sec
300 // 5 min timeout
);
// Used by ExuluContext
const context = new ExuluContext ({
id: "docs" ,
embedder: new ExuluEmbedder ({
// ...
queue: await embeddingsQueue . use ()
})
});
// Jobs are queued automatically
await context . createItem ( item , config );
const processingQueue = ExuluQueues . register (
"document-processing" ,
{ worker: 3 , queue: 5 },
5 , // 5 jobs/sec
600 // 10 min timeout
);
// Used by processor
const context = new ExuluContext ({
id: "docs" ,
processor: {
name: "PDF Processor" ,
config: {
queue: await processingQueue . use (),
trigger: "onInsert"
},
execute : async ({ item }) => {
const text = await extractPDFText ( item . file_s3key );
return { ... item , content: text };
}
}
});
const syncQueue = ExuluQueues . register (
"data-sync" ,
{ worker: 1 , queue: 1 },
1 , // 1 job/sec
1800 // 30 min timeout
);
// Used by ExuluContext source
const context = new ExuluContext ({
id: "github-issues" ,
sources: [{
id: "github" ,
name: "GitHub Sync" ,
description: "Syncs issues from GitHub" ,
config: {
schedule: "0 */6 * * *" , // Every 6 hours
queue: await syncQueue . use ()
},
execute : async () => {
const issues = await fetchGitHubIssues ();
return issues ;
}
}]
});
const workflowQueue = ExuluQueues . register (
"agent-workflows" ,
{ worker: 10 , queue: 20 },
20 , // 20 jobs/sec
300 // 5 min timeout
);
// Used by ExuluAgent
const agent = new ExuluAgent ({
id: "assistant" ,
name: "Assistant" ,
workflows: {
enabled: true ,
queue: await workflowQueue . use ()
}
// ...
});
Queue lifecycle
Register queue
Define queue configuration with ExuluQueues.register() const myQueue = ExuluQueues . register ( "my-queue" , { ... });
Initialize queue
Call .use() to create the BullMQ Queue instance const config = await myQueue . use ();
Add jobs
Queue jobs for processing await config . queue . add ( "job-name" , { data: "..." });
Create workers
Start worker processes to handle jobs const worker = new Worker ( "my-queue" , processor , options );
Process jobs
Workers pick up and execute jobs asynchronously
Integration with ExuluApp
ExuluApp automatically creates workers for registered queues:
const app = new ExuluApp ();
await app . create ({
config: {
workers: {
enabled: true // Creates workers for all queues
}
},
contexts: {
docs: docsContext // Context with embedder queue
},
agents: {
assistant: agent // Agent with workflow queue
}
});
// Workers are automatically created and started
When workers.enabled: true, ExuluApp:
Discovers all queues from contexts, agents, and embedders
Creates Worker instances for each queue
Configures concurrency and rate limits
Starts processing jobs
Job states
Jobs move through these states:
waiting → active → completed
↓
failed → waiting (retry)
waiting : Job is queued, not yet picked up
active : Worker is currently processing
completed : Job finished successfully
failed : Job threw an error
Failed jobs automatically retry based on configuration.
Monitoring
Built-in telemetry
ExuluQueues includes OpenTelemetry integration:
import { BullMQOtel } from "bullmq-otel" ;
const queue = new Queue ( "my-queue" , {
telemetry: new BullMQOtel ( "exulu-app" )
});
This tracks:
Job duration
Success/failure rates
Queue depth
Processing latency
Queue inspection
Check queue status programmatically:
const config = await myQueue . use ();
// Get job counts
const counts = await config . queue . getJobCounts ();
console . log ( counts );
// { waiting: 5, active: 2, completed: 100, failed: 3 }
// Get specific jobs
const failedJobs = await config . queue . getFailed ();
const activeJobs = await config . queue . getActive ();
// Get queue metrics
const concurrency = await config . queue . getGlobalConcurrency ();
console . log ( `Global concurrency: ${ concurrency } ` );
Best practices
Right-size concurrency : Start with low concurrency and increase gradually. Monitor CPU/memory usage and adjust.
Configure timeouts : Set realistic timeouts based on expected job duration. Too short = premature failures, too long = stuck workers.
Redis is required : Queues require a Redis server. Ensure Redis is configured before using queues.
Rate limits : Set rate limits based on external service constraints (API limits, database capacity, LLM rate limits).
Redis configuration
ExuluQueues requires Redis connection info:
# Environment variables
REDIS_HOST = localhost
REDIS_PORT = 6379
REDIS_PASSWORD = optional-password
Or configure programmatically:
import { redisServer } from "@exulu/backend" ;
redisServer . host = "localhost" ;
redisServer . port = "6379" ;
redisServer . password = "optional-password" ;
Next steps