Skip to main content

Queue registration

Register queues using the ExuluQueues.register() method:
ExuluQueues.register(
  name: string,
  concurrency: { worker: number; queue: number },
  ratelimit?: number,
  timeoutInSeconds?: number
)

Parameters

name

name
string
required
Unique queue name. Used to identify the queue and its workers.
ExuluQueues.register("embeddings", ...)
Queue names should be descriptive and unique across your application. Common patterns: embeddings, document-processing, data-sync, agent-workflows.

concurrency

concurrency
object
required
Concurrency configuration with worker and queue levels
concurrency: {
  worker: number,  // Jobs per worker process
  queue: number    // Global max jobs across all workers
}

concurrency.worker

concurrency.worker
number
required
Number of jobs a single worker process can handle simultaneously
concurrency: {
  worker: 5  // Each worker handles 5 jobs at once
}
Guidelines:
  • CPU-bound tasks: 1-2 per CPU core
  • I/O-bound tasks: 5-20 per worker
  • Memory-intensive tasks: 1-5 per worker

concurrency.queue

concurrency.queue
number
required
Maximum jobs processing globally across all workers
concurrency: {
  queue: 10  // Max 10 jobs across all workers
}
Guidelines:
  • API rate limits: Match your API’s concurrency limit
  • Database load: 5-50 depending on DB capacity
  • External service: Check service documentation

ratelimit

ratelimit
number
default:1
Maximum jobs processed per second (default: 1)
ExuluQueues.register(
  "api-calls",
  { worker: 5, queue: 10 },
  10  // Process 10 jobs per second
)
Use cases:
  • API rate limits (e.g., OpenAI: 10 req/sec)
  • Cost control (limit LLM calls per second)
  • Database protection (prevent overload)

timeoutInSeconds

timeoutInSeconds
number
default:180
Maximum execution time for a job in seconds (default: 180 = 3 minutes)
ExuluQueues.register(
  "long-running-tasks",
  { worker: 2, queue: 5 },
  1,
  600  // 10 minute timeout
)
Guidelines:
  • Quick tasks: 30-60 seconds
  • Document processing: 120-300 seconds
  • Data sync: 300-1800 seconds
  • Large embeddings: 600-3600 seconds

Configuration examples

Embeddings generation queue

import { ExuluQueues } from "@exulu/backend";

const embeddingsQueue = ExuluQueues.register(
  "embeddings",
  {
    worker: 5,   // 5 concurrent embeddings per worker
    queue: 10    // Max 10 embeddings globally
  },
  10,            // 10 jobs/second (OpenAI rate limit)
  300            // 5 minute timeout
);

// Use with ExuluEmbedder
const embedder = new ExuluEmbedder({
  id: "openai_embedder",
  name: "OpenAI Embeddings",
  provider: "openai",
  model: "text-embedding-3-small",
  vectorDimensions: 1536,
  queue: await embeddingsQueue.use()
});

Document processing queue

const processingQueue = ExuluQueues.register(
  "document-processing",
  {
    worker: 3,   // 3 documents per worker
    queue: 5     // Max 5 documents globally
  },
  5,             // 5 jobs/second
  600            // 10 minute timeout for large docs
);

// Use with ExuluContext processor
const context = new ExuluContext({
  id: "documents",
  processor: {
    name: "PDF Text Extractor",
    config: {
      queue: await processingQueue.use(),
      trigger: "onInsert",
      generateEmbeddings: true
    },
    execute: async ({ item, utils }) => {
      const text = await utils.storage.extractText(item.document_s3key);
      return { ...item, content: text };
    }
  }
});

Data sync queue

const syncQueue = ExuluQueues.register(
  "github-sync",
  {
    worker: 1,   // Sequential processing
    queue: 1     // One sync at a time
  },
  1,             // 1 job/second (no rush)
  1800           // 30 minute timeout
);

// Use with ExuluContext source
const context = new ExuluContext({
  id: "github_issues",
  sources: [{
    id: "github",
    name: "GitHub Issues Sync",
    description: "Syncs issues from GitHub",
    config: {
      schedule: "0 */6 * * *",  // Every 6 hours
      queue: await syncQueue.use(),
      retries: 3,
      backoff: {
        type: "exponential",
        delay: 2000
      }
    },
    execute: async ({ exuluConfig }) => {
      const issues = await fetchGitHubIssues();
      return issues.map(issue => ({
        external_id: issue.id,
        name: issue.title,
        content: issue.body
      }));
    }
  }]
});

Agent workflow queue

const workflowQueue = ExuluQueues.register(
  "agent-workflows",
  {
    worker: 10,   // 10 concurrent agent tasks per worker
    queue: 20     // Max 20 agent tasks globally
  },
  20,             // 20 jobs/second
  300             // 5 minute timeout
);

// Use with ExuluAgent
const agent = new ExuluAgent({
  id: "assistant",
  name: "Assistant",
  type: "agent",
  description: "AI assistant",
  provider: "openai",
  config: { /* ... */ },
  workflows: {
    enabled: true,
    queue: await workflowQueue.use()
  },
  capabilities: { text: true, images: [], files: [], audio: [], video: [] }
});

High-throughput queue

const highThroughputQueue = ExuluQueues.register(
  "bulk-operations",
  {
    worker: 20,   // 20 concurrent jobs per worker
    queue: 100    // Max 100 jobs globally
  },
  50,             // 50 jobs/second
  60              // 1 minute timeout
);

// For fast, lightweight operations
await queueConfig.queue.add("process-item", {
  itemId: "item-123"
});

Low-throughput, long-running queue

const batchQueue = ExuluQueues.register(
  "nightly-batch",
  {
    worker: 1,    // Sequential processing
    queue: 1      // One at a time
  },
  0.1,            // 1 job every 10 seconds
  7200            // 2 hour timeout
);

// For expensive, long-running batch jobs
await queueConfig.queue.add("nightly-report", {
  date: new Date()
});

API-constrained queue

// OpenAI has rate limits: 10 req/sec for most tiers
const openaiQueue = ExuluQueues.register(
  "openai-api",
  {
    worker: 3,    // 3 concurrent requests per worker
    queue: 10     // Max 10 requests globally
  },
  9,              // 9 req/sec (under the limit)
  120             // 2 minute timeout
);

// Anthropic has different limits: 5 req/sec
const anthropicQueue = ExuluQueues.register(
  "anthropic-api",
  {
    worker: 2,
    queue: 5
  },
  4,              // 4 req/sec (under the limit)
  120
);

Queue configuration object

After registration, call .use() to get the queue configuration:
const myQueue = ExuluQueues.register("my-queue", ...);

const config = await myQueue.use();

// config has this structure:
type ExuluQueueConfig = {
  queue: Queue;              // BullMQ Queue instance
  ratelimit: number;         // Jobs per second
  timeoutInSeconds: number;  // Job timeout
  concurrency: {
    worker: number;          // Per-worker concurrency
    queue: number;           // Global concurrency
  };
  retries?: number;          // Retry count (optional)
  backoff?: {                // Backoff strategy (optional)
    type: "exponential" | "linear";
    delay: number;           // Delay in milliseconds
  };
};

Retry and backoff configuration

Configure retries and backoff in ExuluContext sources:
const context = new ExuluContext({
  id: "data",
  sources: [{
    id: "api-source",
    name: "API Data Source",
    description: "Fetches data from external API",
    config: {
      queue: await myQueue.use(),
      retries: 5,            // Retry up to 5 times
      backoff: {
        type: "exponential", // exponential or linear
        delay: 1000          // Start with 1 second delay
      }
    },
    execute: async () => {
      const data = await fetchFromAPI();
      return data;
    }
  }]
});

Exponential backoff

Delay doubles after each retry:
backoff: {
  type: "exponential",
  delay: 1000  // 1s, 2s, 4s, 8s, 16s, ...
}
Use for: Temporary failures, API rate limits, transient network issues

Linear backoff

Fixed delay between retries:
backoff: {
  type: "linear",
  delay: 5000  // 5s, 5s, 5s, 5s, ...
}
Use for: Predictable delays, scheduled retry windows

Concurrency tuning

Start conservative

Begin with low concurrency and increase gradually:
// Starting point
ExuluQueues.register(
  "new-queue",
  { worker: 1, queue: 1 },  // Start with 1
  1,
  180
);

// After testing, increase
ExuluQueues.register(
  "new-queue",
  { worker: 5, queue: 10 },  // Scale up
  10,
  180
);

Monitor and adjust

Watch these metrics:
  • CPU usage: High CPU? Reduce worker concurrency
  • Memory usage: High memory? Reduce worker concurrency
  • Queue depth: Jobs piling up? Increase concurrency
  • Error rate: Many failures? Reduce concurrency or rate limit

Concurrency patterns

// Good for general-purpose queues
concurrency: {
  worker: 5,   // Moderate per-worker
  queue: 20    // Higher global limit
}
Multiple workers can run without hitting global limit.

Rate limit strategies

API-based

Match your API provider’s limits:
// OpenAI Tier 4: 10,000 req/min = 166 req/sec
ExuluQueues.register("openai", { worker: 10, queue: 100 }, 150);

// Anthropic Pro: 5 req/sec
ExuluQueues.register("anthropic", { worker: 2, queue: 5 }, 4);

// Google Gemini: 60 req/min = 1 req/sec
ExuluQueues.register("gemini", { worker: 1, queue: 1 }, 0.9);

Cost-based

Control spending by limiting throughput:
// Limit expensive LLM calls
ExuluQueues.register(
  "gpt4-calls",
  { worker: 1, queue: 2 },
  0.5  // 0.5 jobs/sec = 1800 calls/hour
);

// Track costs:
// 1800 calls/hr × $0.03/call = $54/hr max

Database-based

Prevent database overload:
// PostgreSQL can handle ~100-200 concurrent connections
ExuluQueues.register(
  "db-queries",
  { worker: 10, queue: 50 },
  100  // 100 queries/sec
);

Timeout configuration

Set timeouts based on expected duration + buffer:
// Quick operations (embeddings)
ExuluQueues.register("embeddings", { ... }, 10, 60);  // 1 min

// Medium operations (document parsing)
ExuluQueues.register("parsing", { ... }, 5, 300);     // 5 min

// Long operations (data sync)
ExuluQueues.register("sync", { ... }, 1, 1800);       // 30 min

// Very long operations (large batch jobs)
ExuluQueues.register("batch", { ... }, 1, 7200);      // 2 hours
Jobs that exceed the timeout are terminated and marked as failed. Ensure timeouts are realistic for your workload.

Redis configuration

ExuluQueues requires Redis connection information:
# .env file
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-password  # Optional
Or configure programmatically:
import { redisServer } from "@exulu/backend";

redisServer.host = "localhost";
redisServer.port = "6379";
redisServer.password = "your-password";  // Optional

Redis for production

For production environments:
# Managed Redis service
REDIS_HOST=redis-12345.cloud.redislabs.com
REDIS_PORT=12345
REDIS_PASSWORD=your-secure-password
Recommended providers:
  • Redis Cloud
  • AWS ElastiCache
  • Google Cloud Memorystore
  • Azure Cache for Redis

Best practices

Create separate queues for different workload types:
ExuluQueues.register("embeddings", ...);
ExuluQueues.register("processing", ...);
ExuluQueues.register("sync", ...);
Benefits: Independent scaling, better monitoring, isolated failures
Consider available CPU, memory, and external service limits:
// 4 CPU cores → worker concurrency 4-8
// 8 GB RAM → adjust based on job memory
// API limit 10 req/sec → rate limit 9
Add buffer to expected duration:
// Expected: 30 seconds → Timeout: 60 seconds
// Expected: 2 minutes → Timeout: 5 minutes
For transient failures, exponential backoff is more effective:
backoff: {
  type: "exponential",
  delay: 1000  // 1s, 2s, 4s, 8s, 16s
}
If jobs pile up (high waiting count), increase concurrency or rate limit:
const counts = await config.queue.getJobCounts();
if (counts.waiting > 1000) {
  // Consider increasing concurrency
}

Next steps