Skip to main content

Singleton instance

ExuluQueues is exported as a singleton instance:
import { ExuluQueues } from "@exulu/backend";

// Use the singleton directly
const myQueue = ExuluQueues.register("my-queue", ...);

Methods

register()

Registers a new queue with specified configuration.
ExuluQueues.register(
  name: string,
  concurrency: { worker: number; queue: number },
  ratelimit?: number,
  timeoutInSeconds?: number
): {
  use: () => Promise<ExuluQueueConfig>
}
name
string
required
Unique queue name
concurrency
object
required
Concurrency configuration with worker and queue levels
concurrency.worker
number
required
Jobs per worker process
concurrency.queue
number
required
Global max jobs across all workers
ratelimit
number
default:1
Maximum jobs per second (default: 1)
timeoutInSeconds
number
default:180
Job timeout in seconds (default: 180)
return
{ use: () => Promise<ExuluQueueConfig> }
Object with use() method to initialize the queue
import { ExuluQueues } from "@exulu/backend";

// Register a queue
const embeddingsQueue = ExuluQueues.register(
  "embeddings",           // name
  {
    worker: 5,            // worker concurrency
    queue: 10             // queue concurrency
  },
  10,                     // rate limit: 10 jobs/sec
  300                     // timeout: 5 minutes
);

// Initialize the queue
const config = await embeddingsQueue.use();

// Add jobs
await config.queue.add("generate-embeddings", {
  contextId: "docs",
  itemId: "item-123"
});
register() only registers the queue configuration. Call .use() to actually create the BullMQ Queue instance.

queue()

Retrieves a queue configuration by name.
ExuluQueues.queue(name: string): {
  queue: Queue;
  ratelimit: number;
  concurrency: {
    worker: number;
    queue: number;
  };
} | undefined
name
string
required
Queue name to retrieve
return
QueueConfig | undefined
Queue configuration object, or undefined if not found
// Get a previously registered queue
const config = ExuluQueues.queue("embeddings");

if (config) {
  console.log(config.queue.name);           // "embeddings"
  console.log(config.ratelimit);            // 10
  console.log(config.concurrency.worker);   // 5
  console.log(config.concurrency.queue);    // 10
}
Use this to access queues that were registered elsewhere in your application.

Properties

queues

queues
QueueConfig[]
Array of all initialized queue configurations
// Access all queues
ExuluQueues.queues.forEach(config => {
  console.log(`Queue: ${config.queue.name}`);
  console.log(`Rate limit: ${config.ratelimit} jobs/sec`);
  console.log(`Worker concurrency: ${config.concurrency.worker}`);
  console.log(`Queue concurrency: ${config.concurrency.queue}`);
  console.log(`Timeout: ${config.timeoutInSeconds}s`);
});

list

list
Map<string, QueueRegistration>
Map of registered queue configurations (before initialization)
// Access registered queues
const registration = ExuluQueues.list.get("embeddings");

if (registration) {
  console.log(registration.name);                    // "embeddings"
  console.log(registration.concurrency.worker);      // 5
  console.log(registration.concurrency.queue);       // 10
  console.log(registration.ratelimit);               // 10
  console.log(registration.timeoutInSeconds);        // 300

  // Initialize the queue
  const config = await registration.use();
}

// List all registered queues
for (const [name, registration] of ExuluQueues.list) {
  console.log(`Registered queue: ${name}`);
}

Queue configuration object

When you call .use(), you get an ExuluQueueConfig object:
type ExuluQueueConfig = {
  queue: Queue;              // BullMQ Queue instance
  ratelimit: number;         // Jobs per second
  timeoutInSeconds: number;  // Job timeout
  concurrency: {
    worker: number;          // Per-worker concurrency
    queue: number;           // Global concurrency
  };
  retries?: number;          // Optional: retry count
  backoff?: {                // Optional: backoff strategy
    type: "exponential" | "linear";
    delay: number;
  };
};

Using the Queue instance

The queue property is a BullMQ Queue instance with full API access:
const config = await myQueue.use();

// Add jobs
await config.queue.add("job-name", { data: "..." });

// Add delayed jobs
await config.queue.add("job-name", { data: "..." }, {
  delay: 5000  // Delay 5 seconds
});

// Add scheduled jobs
await config.queue.add("job-name", { data: "..." }, {
  repeat: {
    pattern: "0 0 * * *"  // Daily at midnight
  }
});

// Get job counts
const counts = await config.queue.getJobCounts();
console.log(counts);
// { waiting: 5, active: 2, completed: 100, failed: 3 }

// Get jobs
const waitingJobs = await config.queue.getWaiting();
const activeJobs = await config.queue.getActive();
const completedJobs = await config.queue.getCompleted();
const failedJobs = await config.queue.getFailed();

// Get queue metrics
const concurrency = await config.queue.getGlobalConcurrency();
console.log(`Global concurrency: ${concurrency}`);

// Update global concurrency
await config.queue.setGlobalConcurrency(20);

// Pause/resume queue
await config.queue.pause();
await config.queue.resume();

// Remove jobs
await config.queue.clean(24 * 3600 * 1000, 0, "completed");  // Remove completed jobs older than 24h

// Drain queue (remove all waiting jobs)
await config.queue.drain();

// Close queue
await config.queue.close();

Usage examples

Basic queue registration and usage

import { ExuluQueues } from "@exulu/backend";

// Register queue
const myQueue = ExuluQueues.register(
  "my-queue",
  { worker: 5, queue: 10 },
  10,
  180
);

// Initialize
const config = await myQueue.use();

// Add a job
await config.queue.add("process-item", {
  itemId: "123",
  action: "process"
});

// Add a job with options
await config.queue.add(
  "process-item",
  { itemId: "456" },
  {
    priority: 1,        // Higher priority
    delay: 5000,        // Delay 5 seconds
    attempts: 3,        // Retry 3 times
    backoff: {
      type: "exponential",
      delay: 1000
    }
  }
);

With ExuluEmbedder

import { ExuluQueues, ExuluEmbedder } from "@exulu/backend";

// Register queue for embeddings
const embeddingsQueue = ExuluQueues.register(
  "embeddings",
  { worker: 5, queue: 10 },
  10,   // 10 jobs/sec
  300   // 5 min timeout
);

// Create embedder with queue
const embedder = new ExuluEmbedder({
  id: "openai_embedder",
  name: "OpenAI Embeddings",
  provider: "openai",
  model: "text-embedding-3-small",
  vectorDimensions: 1536,
  queue: await embeddingsQueue.use()
});

// Embeddings are now queued automatically
const context = new ExuluContext({
  id: "docs",
  embedder: embedder,
  // ...
});

await context.createItem(item, config);
// Embeddings generation is queued, not blocking

With ExuluContext processor

import { ExuluQueues, ExuluContext } from "@exulu/backend";

// Register processing queue
const processingQueue = ExuluQueues.register(
  "document-processing",
  { worker: 3, queue: 5 },
  5,     // 5 jobs/sec
  600    // 10 min timeout
);

// Use in context processor
const context = new ExuluContext({
  id: "documents",
  processor: {
    name: "PDF Text Extractor",
    config: {
      queue: await processingQueue.use(),
      trigger: "onInsert",
      generateEmbeddings: true,
      timeoutInSeconds: 600,
      retries: 3,
      backoff: {
        type: "exponential",
        delay: 2000
      }
    },
    execute: async ({ item, utils }) => {
      const text = await utils.storage.extractText(item.document_s3key);
      return { ...item, content: text };
    }
  },
  fields: [
    { name: "title", type: "text", required: true },
    { name: "document", type: "file", allowedFileTypes: [".pdf"] },
    { name: "content", type: "longtext" }
  ],
  sources: []
});

With ExuluContext data source

import { ExuluQueues, ExuluContext } from "@exulu/backend";

// Register sync queue
const syncQueue = ExuluQueues.register(
  "github-sync",
  { worker: 1, queue: 1 },
  1,      // 1 job/sec
  1800    // 30 min timeout
);

// Use in context source
const context = new ExuluContext({
  id: "github_issues",
  sources: [{
    id: "github",
    name: "GitHub Issues Sync",
    description: "Syncs issues from GitHub repository",
    config: {
      schedule: "0 */6 * * *",  // Every 6 hours
      queue: await syncQueue.use(),
      retries: 3,
      backoff: {
        type: "exponential",
        delay: 2000
      }
    },
    execute: async ({ exuluConfig }) => {
      const issues = await fetchGitHubIssues();
      return issues.map(issue => ({
        external_id: issue.id,
        name: issue.title,
        content: issue.body,
        metadata: { labels: issue.labels }
      }));
    }
  }],
  fields: [
    { name: "title", type: "text", required: true },
    { name: "content", type: "longtext", required: true },
    { name: "metadata", type: "json" }
  ]
});

With ExuluAgent workflows

import { ExuluQueues, ExuluAgent } from "@exulu/backend";
import { createOpenAI } from "@ai-sdk/openai";

// Register workflow queue
const workflowQueue = ExuluQueues.register(
  "agent-workflows",
  { worker: 10, queue: 20 },
  20,   // 20 jobs/sec
  300   // 5 min timeout
);

// Use in agent
const agent = new ExuluAgent({
  id: "assistant",
  name: "Assistant",
  type: "agent",
  description: "AI assistant with background workflows",
  provider: "openai",
  config: {
    name: "gpt-4o",
    model: {
      create: ({ apiKey }) => createOpenAI({ apiKey })("gpt-4o")
    },
    instructions: "You are a helpful assistant."
  },
  workflows: {
    enabled: true,
    queue: await workflowQueue.use()
  },
  capabilities: {
    text: true,
    images: [],
    files: [],
    audio: [],
    video: []
  }
});

Monitoring queue status

const config = await myQueue.use();

// Get job counts
const counts = await config.queue.getJobCounts();
console.log(`Waiting: ${counts.waiting}`);
console.log(`Active: ${counts.active}`);
console.log(`Completed: ${counts.completed}`);
console.log(`Failed: ${counts.failed}`);

// Get failed jobs
const failedJobs = await config.queue.getFailed();
for (const job of failedJobs) {
  console.log(`Job ${job.id} failed:`, job.failedReason);
  console.log(`Data:`, job.data);
  console.log(`Attempts: ${job.attemptsMade}/${job.opts.attempts}`);
}

// Get active jobs
const activeJobs = await config.queue.getActive();
console.log(`Currently processing ${activeJobs.length} jobs`);

// Check global concurrency
const concurrency = await config.queue.getGlobalConcurrency();
console.log(`Global concurrency: ${concurrency}`);

Managing queue lifecycle

const config = await myQueue.use();

// Pause processing (workers stop picking up new jobs)
await config.queue.pause();
console.log("Queue paused");

// Resume processing
await config.queue.resume();
console.log("Queue resumed");

// Clean old completed jobs
await config.queue.clean(
  24 * 3600 * 1000,  // Older than 24 hours
  100,                // Max 100 jobs
  "completed"         // Job type
);

// Clean old failed jobs
await config.queue.clean(
  7 * 24 * 3600 * 1000,  // Older than 7 days
  0,                      // All jobs
  "failed"
);

// Drain all waiting jobs
await config.queue.drain();
console.log("All waiting jobs removed");

// Close queue connection
await config.queue.close();

Multiple queues

// Register multiple queues for different purposes
const embeddingsQueue = ExuluQueues.register(
  "embeddings",
  { worker: 5, queue: 10 },
  10,
  300
);

const processingQueue = ExuluQueues.register(
  "processing",
  { worker: 3, queue: 5 },
  5,
  600
);

const syncQueue = ExuluQueues.register(
  "sync",
  { worker: 1, queue: 1 },
  1,
  1800
);

// Initialize all queues
const [embeddingsConfig, processingConfig, syncConfig] = await Promise.all([
  embeddingsQueue.use(),
  processingQueue.use(),
  syncQueue.use()
]);

// Use them independently
await embeddingsConfig.queue.add("embed", { ... });
await processingConfig.queue.add("process", { ... });
await syncConfig.queue.add("sync", { ... });

// Monitor all queues
for (const queueConfig of ExuluQueues.queues) {
  const counts = await queueConfig.queue.getJobCounts();
  console.log(`${queueConfig.queue.name}:`, counts);
}

Retrieving registered queues

// Later in your code, retrieve a queue by name
const config = ExuluQueues.queue("embeddings");

if (config) {
  // Add a job to the existing queue
  await config.queue.add("new-job", { data: "..." });
}

// List all registered queues
for (const [name, registration] of ExuluQueues.list) {
  console.log(`Queue: ${name}`);
  console.log(`  Worker concurrency: ${registration.concurrency.worker}`);
  console.log(`  Queue concurrency: ${registration.concurrency.queue}`);
  console.log(`  Rate limit: ${registration.ratelimit} jobs/sec`);
  console.log(`  Timeout: ${registration.timeoutInSeconds}s`);
}

Type definitions

// Queue registration return type
type QueueRegistration = {
  use: () => Promise<ExuluQueueConfig>;
};

// Queue configuration after initialization
type ExuluQueueConfig = {
  queue: Queue;              // BullMQ Queue instance
  ratelimit: number;         // Jobs per second
  timeoutInSeconds: number;  // Job timeout
  concurrency: {
    worker: number;          // Per-worker concurrency
    queue: number;           // Global concurrency
  };
  retries?: number;          // Optional retry count
  backoff?: {                // Optional backoff strategy
    type: "exponential" | "linear";
    delay: number;           // Milliseconds
  };
};

// Queue config in ExuluQueues.queues array
type QueueConfig = {
  queue: Queue;
  ratelimit: number;
  concurrency: {
    worker: number;
    queue: number;
  };
  timeoutInSeconds: number;
};

Best practices

Initialize queues early: Call .use() during application startup to ensure queues are ready before they’re needed.
Reuse queue configurations: Register queues once and retrieve them with ExuluQueues.queue() when needed elsewhere.
Close queues on shutdown: Properly close queue connections when shutting down your application to prevent resource leaks.
Monitor queue depth: Regularly check job counts to ensure queues aren’t backing up. High waiting counts indicate you need more throughput.

Next steps