Singleton instance
ExuluQueues is exported as a singleton instance:
import { ExuluQueues } from "@exulu/backend";
// Use the singleton directly
const myQueue = ExuluQueues.register("my-queue", ...);
Methods
register()
Registers a new queue with specified configuration.
ExuluQueues.register(
name: string,
concurrency: { worker: number; queue: number },
ratelimit?: number,
timeoutInSeconds?: number
): {
use: () => Promise<ExuluQueueConfig>
}
Concurrency configuration with worker and queue levels
Global max jobs across all workers
Maximum jobs per second (default: 1)
Job timeout in seconds (default: 180)
return
{ use: () => Promise<ExuluQueueConfig> }
Object with use() method to initialize the queue
import { ExuluQueues } from "@exulu/backend";
// Register a queue
const embeddingsQueue = ExuluQueues.register(
"embeddings", // name
{
worker: 5, // worker concurrency
queue: 10 // queue concurrency
},
10, // rate limit: 10 jobs/sec
300 // timeout: 5 minutes
);
// Initialize the queue
const config = await embeddingsQueue.use();
// Add jobs
await config.queue.add("generate-embeddings", {
contextId: "docs",
itemId: "item-123"
});
register() only registers the queue configuration. Call .use() to actually create the BullMQ Queue instance.
queue()
Retrieves a queue configuration by name.
ExuluQueues.queue(name: string): {
queue: Queue;
ratelimit: number;
concurrency: {
worker: number;
queue: number;
};
} | undefined
Queue configuration object, or undefined if not found
// Get a previously registered queue
const config = ExuluQueues.queue("embeddings");
if (config) {
console.log(config.queue.name); // "embeddings"
console.log(config.ratelimit); // 10
console.log(config.concurrency.worker); // 5
console.log(config.concurrency.queue); // 10
}
Use this to access queues that were registered elsewhere in your application.
Properties
Array of all initialized queue configurations
// Access all queues
ExuluQueues.queues.forEach(config => {
console.log(`Queue: ${config.queue.name}`);
console.log(`Rate limit: ${config.ratelimit} jobs/sec`);
console.log(`Worker concurrency: ${config.concurrency.worker}`);
console.log(`Queue concurrency: ${config.concurrency.queue}`);
console.log(`Timeout: ${config.timeoutInSeconds}s`);
});
list
Map<string, QueueRegistration>
Map of registered queue configurations (before initialization)
// Access registered queues
const registration = ExuluQueues.list.get("embeddings");
if (registration) {
console.log(registration.name); // "embeddings"
console.log(registration.concurrency.worker); // 5
console.log(registration.concurrency.queue); // 10
console.log(registration.ratelimit); // 10
console.log(registration.timeoutInSeconds); // 300
// Initialize the queue
const config = await registration.use();
}
// List all registered queues
for (const [name, registration] of ExuluQueues.list) {
console.log(`Registered queue: ${name}`);
}
Queue configuration object
When you call .use(), you get an ExuluQueueConfig object:
type ExuluQueueConfig = {
queue: Queue; // BullMQ Queue instance
ratelimit: number; // Jobs per second
timeoutInSeconds: number; // Job timeout
concurrency: {
worker: number; // Per-worker concurrency
queue: number; // Global concurrency
};
retries?: number; // Optional: retry count
backoff?: { // Optional: backoff strategy
type: "exponential" | "linear";
delay: number;
};
};
Using the Queue instance
The queue property is a BullMQ Queue instance with full API access:
const config = await myQueue.use();
// Add jobs
await config.queue.add("job-name", { data: "..." });
// Add delayed jobs
await config.queue.add("job-name", { data: "..." }, {
delay: 5000 // Delay 5 seconds
});
// Add scheduled jobs
await config.queue.add("job-name", { data: "..." }, {
repeat: {
pattern: "0 0 * * *" // Daily at midnight
}
});
// Get job counts
const counts = await config.queue.getJobCounts();
console.log(counts);
// { waiting: 5, active: 2, completed: 100, failed: 3 }
// Get jobs
const waitingJobs = await config.queue.getWaiting();
const activeJobs = await config.queue.getActive();
const completedJobs = await config.queue.getCompleted();
const failedJobs = await config.queue.getFailed();
// Get queue metrics
const concurrency = await config.queue.getGlobalConcurrency();
console.log(`Global concurrency: ${concurrency}`);
// Update global concurrency
await config.queue.setGlobalConcurrency(20);
// Pause/resume queue
await config.queue.pause();
await config.queue.resume();
// Remove jobs
await config.queue.clean(24 * 3600 * 1000, 0, "completed"); // Remove completed jobs older than 24h
// Drain queue (remove all waiting jobs)
await config.queue.drain();
// Close queue
await config.queue.close();
Usage examples
Basic queue registration and usage
import { ExuluQueues } from "@exulu/backend";
// Register queue
const myQueue = ExuluQueues.register(
"my-queue",
{ worker: 5, queue: 10 },
10,
180
);
// Initialize
const config = await myQueue.use();
// Add a job
await config.queue.add("process-item", {
itemId: "123",
action: "process"
});
// Add a job with options
await config.queue.add(
"process-item",
{ itemId: "456" },
{
priority: 1, // Higher priority
delay: 5000, // Delay 5 seconds
attempts: 3, // Retry 3 times
backoff: {
type: "exponential",
delay: 1000
}
}
);
With ExuluEmbedder
import { ExuluQueues, ExuluEmbedder } from "@exulu/backend";
// Register queue for embeddings
const embeddingsQueue = ExuluQueues.register(
"embeddings",
{ worker: 5, queue: 10 },
10, // 10 jobs/sec
300 // 5 min timeout
);
// Create embedder with queue
const embedder = new ExuluEmbedder({
id: "openai_embedder",
name: "OpenAI Embeddings",
provider: "openai",
model: "text-embedding-3-small",
vectorDimensions: 1536,
queue: await embeddingsQueue.use()
});
// Embeddings are now queued automatically
const context = new ExuluContext({
id: "docs",
embedder: embedder,
// ...
});
await context.createItem(item, config);
// Embeddings generation is queued, not blocking
With ExuluContext processor
import { ExuluQueues, ExuluContext } from "@exulu/backend";
// Register processing queue
const processingQueue = ExuluQueues.register(
"document-processing",
{ worker: 3, queue: 5 },
5, // 5 jobs/sec
600 // 10 min timeout
);
// Use in context processor
const context = new ExuluContext({
id: "documents",
processor: {
name: "PDF Text Extractor",
config: {
queue: await processingQueue.use(),
trigger: "onInsert",
generateEmbeddings: true,
timeoutInSeconds: 600,
retries: 3,
backoff: {
type: "exponential",
delay: 2000
}
},
execute: async ({ item, utils }) => {
const text = await utils.storage.extractText(item.document_s3key);
return { ...item, content: text };
}
},
fields: [
{ name: "title", type: "text", required: true },
{ name: "document", type: "file", allowedFileTypes: [".pdf"] },
{ name: "content", type: "longtext" }
],
sources: []
});
With ExuluContext data source
import { ExuluQueues, ExuluContext } from "@exulu/backend";
// Register sync queue
const syncQueue = ExuluQueues.register(
"github-sync",
{ worker: 1, queue: 1 },
1, // 1 job/sec
1800 // 30 min timeout
);
// Use in context source
const context = new ExuluContext({
id: "github_issues",
sources: [{
id: "github",
name: "GitHub Issues Sync",
description: "Syncs issues from GitHub repository",
config: {
schedule: "0 */6 * * *", // Every 6 hours
queue: await syncQueue.use(),
retries: 3,
backoff: {
type: "exponential",
delay: 2000
}
},
execute: async ({ exuluConfig }) => {
const issues = await fetchGitHubIssues();
return issues.map(issue => ({
external_id: issue.id,
name: issue.title,
content: issue.body,
metadata: { labels: issue.labels }
}));
}
}],
fields: [
{ name: "title", type: "text", required: true },
{ name: "content", type: "longtext", required: true },
{ name: "metadata", type: "json" }
]
});
With ExuluAgent workflows
import { ExuluQueues, ExuluAgent } from "@exulu/backend";
import { createOpenAI } from "@ai-sdk/openai";
// Register workflow queue
const workflowQueue = ExuluQueues.register(
"agent-workflows",
{ worker: 10, queue: 20 },
20, // 20 jobs/sec
300 // 5 min timeout
);
// Use in agent
const agent = new ExuluAgent({
id: "assistant",
name: "Assistant",
type: "agent",
description: "AI assistant with background workflows",
provider: "openai",
config: {
name: "gpt-4o",
model: {
create: ({ apiKey }) => createOpenAI({ apiKey })("gpt-4o")
},
instructions: "You are a helpful assistant."
},
workflows: {
enabled: true,
queue: await workflowQueue.use()
},
capabilities: {
text: true,
images: [],
files: [],
audio: [],
video: []
}
});
Monitoring queue status
const config = await myQueue.use();
// Get job counts
const counts = await config.queue.getJobCounts();
console.log(`Waiting: ${counts.waiting}`);
console.log(`Active: ${counts.active}`);
console.log(`Completed: ${counts.completed}`);
console.log(`Failed: ${counts.failed}`);
// Get failed jobs
const failedJobs = await config.queue.getFailed();
for (const job of failedJobs) {
console.log(`Job ${job.id} failed:`, job.failedReason);
console.log(`Data:`, job.data);
console.log(`Attempts: ${job.attemptsMade}/${job.opts.attempts}`);
}
// Get active jobs
const activeJobs = await config.queue.getActive();
console.log(`Currently processing ${activeJobs.length} jobs`);
// Check global concurrency
const concurrency = await config.queue.getGlobalConcurrency();
console.log(`Global concurrency: ${concurrency}`);
Managing queue lifecycle
const config = await myQueue.use();
// Pause processing (workers stop picking up new jobs)
await config.queue.pause();
console.log("Queue paused");
// Resume processing
await config.queue.resume();
console.log("Queue resumed");
// Clean old completed jobs
await config.queue.clean(
24 * 3600 * 1000, // Older than 24 hours
100, // Max 100 jobs
"completed" // Job type
);
// Clean old failed jobs
await config.queue.clean(
7 * 24 * 3600 * 1000, // Older than 7 days
0, // All jobs
"failed"
);
// Drain all waiting jobs
await config.queue.drain();
console.log("All waiting jobs removed");
// Close queue connection
await config.queue.close();
Multiple queues
// Register multiple queues for different purposes
const embeddingsQueue = ExuluQueues.register(
"embeddings",
{ worker: 5, queue: 10 },
10,
300
);
const processingQueue = ExuluQueues.register(
"processing",
{ worker: 3, queue: 5 },
5,
600
);
const syncQueue = ExuluQueues.register(
"sync",
{ worker: 1, queue: 1 },
1,
1800
);
// Initialize all queues
const [embeddingsConfig, processingConfig, syncConfig] = await Promise.all([
embeddingsQueue.use(),
processingQueue.use(),
syncQueue.use()
]);
// Use them independently
await embeddingsConfig.queue.add("embed", { ... });
await processingConfig.queue.add("process", { ... });
await syncConfig.queue.add("sync", { ... });
// Monitor all queues
for (const queueConfig of ExuluQueues.queues) {
const counts = await queueConfig.queue.getJobCounts();
console.log(`${queueConfig.queue.name}:`, counts);
}
Retrieving registered queues
// Later in your code, retrieve a queue by name
const config = ExuluQueues.queue("embeddings");
if (config) {
// Add a job to the existing queue
await config.queue.add("new-job", { data: "..." });
}
// List all registered queues
for (const [name, registration] of ExuluQueues.list) {
console.log(`Queue: ${name}`);
console.log(` Worker concurrency: ${registration.concurrency.worker}`);
console.log(` Queue concurrency: ${registration.concurrency.queue}`);
console.log(` Rate limit: ${registration.ratelimit} jobs/sec`);
console.log(` Timeout: ${registration.timeoutInSeconds}s`);
}
Type definitions
// Queue registration return type
type QueueRegistration = {
use: () => Promise<ExuluQueueConfig>;
};
// Queue configuration after initialization
type ExuluQueueConfig = {
queue: Queue; // BullMQ Queue instance
ratelimit: number; // Jobs per second
timeoutInSeconds: number; // Job timeout
concurrency: {
worker: number; // Per-worker concurrency
queue: number; // Global concurrency
};
retries?: number; // Optional retry count
backoff?: { // Optional backoff strategy
type: "exponential" | "linear";
delay: number; // Milliseconds
};
};
// Queue config in ExuluQueues.queues array
type QueueConfig = {
queue: Queue;
ratelimit: number;
concurrency: {
worker: number;
queue: number;
};
timeoutInSeconds: number;
};
Best practices
Initialize queues early: Call .use() during application startup to ensure queues are ready before they’re needed.
Reuse queue configurations: Register queues once and retrieve them with ExuluQueues.queue() when needed elsewhere.
Close queues on shutdown: Properly close queue connections when shutting down your application to prevent resource leaks.
Monitor queue depth: Regularly check job counts to ensure queues aren’t backing up. High waiting counts indicate you need more throughput.
Next steps