Queue registration
Register queues using the ExuluQueues.register() method:
ExuluQueues.register(
name: string,
concurrency: { worker: number; queue: number },
ratelimit?: number,
timeoutInSeconds?: number
)
Parameters
Unique queue name. Used to identify the queue and its workers.
ExuluQueues.register("embeddings", ...)
Queue names should be descriptive and unique across your application. Common patterns: embeddings, document-processing, data-sync, agent-workflows.
concurrency
Concurrency configuration with worker and queue levels
concurrency: {
worker: number, // Jobs per worker process
queue: number // Global max jobs across all workers
}
concurrency.worker
Number of jobs a single worker process can handle simultaneously
concurrency: {
worker: 5 // Each worker handles 5 jobs at once
}
Guidelines:
- CPU-bound tasks: 1-2 per CPU core
- I/O-bound tasks: 5-20 per worker
- Memory-intensive tasks: 1-5 per worker
concurrency.queue
Maximum jobs processing globally across all workers
concurrency: {
queue: 10 // Max 10 jobs across all workers
}
Guidelines:
- API rate limits: Match your API’s concurrency limit
- Database load: 5-50 depending on DB capacity
- External service: Check service documentation
ratelimit
Maximum jobs processed per second (default: 1)
ExuluQueues.register(
"api-calls",
{ worker: 5, queue: 10 },
10 // Process 10 jobs per second
)
Use cases:
- API rate limits (e.g., OpenAI: 10 req/sec)
- Cost control (limit LLM calls per second)
- Database protection (prevent overload)
timeoutInSeconds
Maximum execution time for a job in seconds (default: 180 = 3 minutes)
ExuluQueues.register(
"long-running-tasks",
{ worker: 2, queue: 5 },
1,
600 // 10 minute timeout
)
Guidelines:
- Quick tasks: 30-60 seconds
- Document processing: 120-300 seconds
- Data sync: 300-1800 seconds
- Large embeddings: 600-3600 seconds
Configuration examples
Embeddings generation queue
import { ExuluQueues } from "@exulu/backend";
const embeddingsQueue = ExuluQueues.register(
"embeddings",
{
worker: 5, // 5 concurrent embeddings per worker
queue: 10 // Max 10 embeddings globally
},
10, // 10 jobs/second (OpenAI rate limit)
300 // 5 minute timeout
);
// Use with ExuluEmbedder
const embedder = new ExuluEmbedder({
id: "openai_embedder",
name: "OpenAI Embeddings",
provider: "openai",
model: "text-embedding-3-small",
vectorDimensions: 1536,
queue: await embeddingsQueue.use()
});
Document processing queue
const processingQueue = ExuluQueues.register(
"document-processing",
{
worker: 3, // 3 documents per worker
queue: 5 // Max 5 documents globally
},
5, // 5 jobs/second
600 // 10 minute timeout for large docs
);
// Use with ExuluContext processor
const context = new ExuluContext({
id: "documents",
processor: {
name: "PDF Text Extractor",
config: {
queue: await processingQueue.use(),
trigger: "onInsert",
generateEmbeddings: true
},
execute: async ({ item, utils }) => {
const text = await utils.storage.extractText(item.document_s3key);
return { ...item, content: text };
}
}
});
Data sync queue
const syncQueue = ExuluQueues.register(
"github-sync",
{
worker: 1, // Sequential processing
queue: 1 // One sync at a time
},
1, // 1 job/second (no rush)
1800 // 30 minute timeout
);
// Use with ExuluContext source
const context = new ExuluContext({
id: "github_issues",
sources: [{
id: "github",
name: "GitHub Issues Sync",
description: "Syncs issues from GitHub",
config: {
schedule: "0 */6 * * *", // Every 6 hours
queue: await syncQueue.use(),
retries: 3,
backoff: {
type: "exponential",
delay: 2000
}
},
execute: async ({ exuluConfig }) => {
const issues = await fetchGitHubIssues();
return issues.map(issue => ({
external_id: issue.id,
name: issue.title,
content: issue.body
}));
}
}]
});
Agent workflow queue
const workflowQueue = ExuluQueues.register(
"agent-workflows",
{
worker: 10, // 10 concurrent agent tasks per worker
queue: 20 // Max 20 agent tasks globally
},
20, // 20 jobs/second
300 // 5 minute timeout
);
// Use with ExuluAgent
const agent = new ExuluAgent({
id: "assistant",
name: "Assistant",
type: "agent",
description: "AI assistant",
provider: "openai",
config: { /* ... */ },
workflows: {
enabled: true,
queue: await workflowQueue.use()
},
capabilities: { text: true, images: [], files: [], audio: [], video: [] }
});
High-throughput queue
const highThroughputQueue = ExuluQueues.register(
"bulk-operations",
{
worker: 20, // 20 concurrent jobs per worker
queue: 100 // Max 100 jobs globally
},
50, // 50 jobs/second
60 // 1 minute timeout
);
// For fast, lightweight operations
await queueConfig.queue.add("process-item", {
itemId: "item-123"
});
Low-throughput, long-running queue
const batchQueue = ExuluQueues.register(
"nightly-batch",
{
worker: 1, // Sequential processing
queue: 1 // One at a time
},
0.1, // 1 job every 10 seconds
7200 // 2 hour timeout
);
// For expensive, long-running batch jobs
await queueConfig.queue.add("nightly-report", {
date: new Date()
});
API-constrained queue
// OpenAI has rate limits: 10 req/sec for most tiers
const openaiQueue = ExuluQueues.register(
"openai-api",
{
worker: 3, // 3 concurrent requests per worker
queue: 10 // Max 10 requests globally
},
9, // 9 req/sec (under the limit)
120 // 2 minute timeout
);
// Anthropic has different limits: 5 req/sec
const anthropicQueue = ExuluQueues.register(
"anthropic-api",
{
worker: 2,
queue: 5
},
4, // 4 req/sec (under the limit)
120
);
Queue configuration object
After registration, call .use() to get the queue configuration:
const myQueue = ExuluQueues.register("my-queue", ...);
const config = await myQueue.use();
// config has this structure:
type ExuluQueueConfig = {
queue: Queue; // BullMQ Queue instance
ratelimit: number; // Jobs per second
timeoutInSeconds: number; // Job timeout
concurrency: {
worker: number; // Per-worker concurrency
queue: number; // Global concurrency
};
retries?: number; // Retry count (optional)
backoff?: { // Backoff strategy (optional)
type: "exponential" | "linear";
delay: number; // Delay in milliseconds
};
};
Retry and backoff configuration
Configure retries and backoff in ExuluContext sources:
const context = new ExuluContext({
id: "data",
sources: [{
id: "api-source",
name: "API Data Source",
description: "Fetches data from external API",
config: {
queue: await myQueue.use(),
retries: 5, // Retry up to 5 times
backoff: {
type: "exponential", // exponential or linear
delay: 1000 // Start with 1 second delay
}
},
execute: async () => {
const data = await fetchFromAPI();
return data;
}
}]
});
Exponential backoff
Delay doubles after each retry:
backoff: {
type: "exponential",
delay: 1000 // 1s, 2s, 4s, 8s, 16s, ...
}
Use for: Temporary failures, API rate limits, transient network issues
Linear backoff
Fixed delay between retries:
backoff: {
type: "linear",
delay: 5000 // 5s, 5s, 5s, 5s, ...
}
Use for: Predictable delays, scheduled retry windows
Concurrency tuning
Start conservative
Begin with low concurrency and increase gradually:
// Starting point
ExuluQueues.register(
"new-queue",
{ worker: 1, queue: 1 }, // Start with 1
1,
180
);
// After testing, increase
ExuluQueues.register(
"new-queue",
{ worker: 5, queue: 10 }, // Scale up
10,
180
);
Monitor and adjust
Watch these metrics:
- CPU usage: High CPU? Reduce worker concurrency
- Memory usage: High memory? Reduce worker concurrency
- Queue depth: Jobs piling up? Increase concurrency
- Error rate: Many failures? Reduce concurrency or rate limit
Concurrency patterns
Balanced
Worker-heavy
Queue-constrained
Sequential
// Good for general-purpose queues
concurrency: {
worker: 5, // Moderate per-worker
queue: 20 // Higher global limit
}
Multiple workers can run without hitting global limit.// Good for I/O-bound tasks with no global constraints
concurrency: {
worker: 20, // High per-worker
queue: 1000 // Very high global limit
}
Workers max out before hitting global limit.// Good for external API rate limits
concurrency: {
worker: 2, // Low per-worker
queue: 10 // Strict global limit
}
Global limit enforced across all workers.// Good for ordered processing or single-threaded tasks
concurrency: {
worker: 1,
queue: 1
}
One job at a time, guaranteed order.
Rate limit strategies
API-based
Match your API provider’s limits:
// OpenAI Tier 4: 10,000 req/min = 166 req/sec
ExuluQueues.register("openai", { worker: 10, queue: 100 }, 150);
// Anthropic Pro: 5 req/sec
ExuluQueues.register("anthropic", { worker: 2, queue: 5 }, 4);
// Google Gemini: 60 req/min = 1 req/sec
ExuluQueues.register("gemini", { worker: 1, queue: 1 }, 0.9);
Cost-based
Control spending by limiting throughput:
// Limit expensive LLM calls
ExuluQueues.register(
"gpt4-calls",
{ worker: 1, queue: 2 },
0.5 // 0.5 jobs/sec = 1800 calls/hour
);
// Track costs:
// 1800 calls/hr × $0.03/call = $54/hr max
Database-based
Prevent database overload:
// PostgreSQL can handle ~100-200 concurrent connections
ExuluQueues.register(
"db-queries",
{ worker: 10, queue: 50 },
100 // 100 queries/sec
);
Timeout configuration
Set timeouts based on expected duration + buffer:
// Quick operations (embeddings)
ExuluQueues.register("embeddings", { ... }, 10, 60); // 1 min
// Medium operations (document parsing)
ExuluQueues.register("parsing", { ... }, 5, 300); // 5 min
// Long operations (data sync)
ExuluQueues.register("sync", { ... }, 1, 1800); // 30 min
// Very long operations (large batch jobs)
ExuluQueues.register("batch", { ... }, 1, 7200); // 2 hours
Jobs that exceed the timeout are terminated and marked as failed. Ensure timeouts are realistic for your workload.
Redis configuration
ExuluQueues requires Redis connection information:
# .env file
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-password # Optional
Or configure programmatically:
import { redisServer } from "@exulu/backend";
redisServer.host = "localhost";
redisServer.port = "6379";
redisServer.password = "your-password"; // Optional
Redis for production
For production environments:
# Managed Redis service
REDIS_HOST=redis-12345.cloud.redislabs.com
REDIS_PORT=12345
REDIS_PASSWORD=your-secure-password
Recommended providers:
- Redis Cloud
- AWS ElastiCache
- Google Cloud Memorystore
- Azure Cache for Redis
Best practices
Create separate queues for different workload types:ExuluQueues.register("embeddings", ...);
ExuluQueues.register("processing", ...);
ExuluQueues.register("sync", ...);
Benefits: Independent scaling, better monitoring, isolated failures Match concurrency to resources
Consider available CPU, memory, and external service limits:// 4 CPU cores → worker concurrency 4-8
// 8 GB RAM → adjust based on job memory
// API limit 10 req/sec → rate limit 9
Add buffer to expected duration:// Expected: 30 seconds → Timeout: 60 seconds
// Expected: 2 minutes → Timeout: 5 minutes
For transient failures, exponential backoff is more effective:backoff: {
type: "exponential",
delay: 1000 // 1s, 2s, 4s, 8s, 16s
}
If jobs pile up (high waiting count), increase concurrency or rate limit:const counts = await config.queue.getJobCounts();
if (counts.waiting > 1000) {
// Consider increasing concurrency
}
Next steps