Skip to content

runmq/queue

Repository files navigation

RunMQ

RunMQ is a reliable message queue library for Node.js built on top of RabbitMQ. Supports async background processing and event-driven messaging for microservices, with automatic retries, schema validation, and DLQ.

RunMQ can be used to implement multiple messaging or jobs processing patterns:

  • Event Bus for event-driven architectures, where multiple services independently react to the same events
  • Job Queue for async background task processing

Features

  • Automatic Connection Management: Built-in retry logic with configurable attempts and delays
  • Message Processing with Retries: Automatic retry mechanism for failed messages with configurable retry delays
  • Dead Letter Queue (DLQ) Support: Failed messages automatically move to DLQ after exhausting retry attempts
  • Isolated Queues: Each processor maintains its own queue and DLQ, ensuring complete isolation between services
  • Schema Validation: Optional message validation using JSON Schema (AJV)
  • Concurrent Processing: Support for multiple concurrent consumers per queue
  • Correlation ID Support: Built-in correlation ID generation and tracking for distributed tracing
  • Custom Logging: Pluggable logging interface with default console logger

Installation

npm install runmq

Architecture Overview

RunMQ can be used to implement various messaging patterns. Here are two common architectures:

1. Event-Driven Architecture (Event Bus Pattern)

In this pattern, multiple processors (or services) subscribe to the same event topic. Each processor gets its own isolated queue and DLQ, enabling true microservices autonomy.

Publisher → Topic (user.created)
              ├→ Queue: emailService      → DLQ: emailService_dlq
              ├→ Queue: analyticsService  → DLQ: analyticsService_dlq
              └→ Queue: notificationService → DLQ: notificationService_dlq

Key Benefits:

  • Services remain independent and isolated
  • Each service can fail/retry without affecting others
  • Easy to add new services or processors by subscribing to existing topics
  • Scalable with multiple concurrent workers

2. Background Processing Pattern

RunMQ can also be used as a job queue for background processing tasks. A single worker service processes jobs from a dedicated queue with retries and DLQ support.

Publisher → Topic (email.send) → Queue: emailWorker → DLQ: emailWorker_dlq

Key Benefits:

  • Simple async job processing
  • Automatic retries for failed jobs
  • Scalable with multiple concurrent workers
  • Dead letter queue for failed jobs

Quick Start

Basic Setup

import { RunMQ } from 'runmq';

// 1. Initialize RunMQ
const runMQ = await RunMQ.start({
    url: 'amqp://localhost:5672',
    reconnectDelay: 5000,        // optional, default: 5000ms
    maxReconnectAttempts: 5      // optional, default: 5
});

// 2. Process messages (create a consumer)
await runMQ.process('user.created', {
    name: 'emailService',        // Unique processor name (creates isolated queue)
    consumersCount: 2,           // Number of concurrent workers
    attempts: 3,                 // Try processing a message up to 3 times
    attemptsDelay: 2000            // Wait 2 seconds between retries
}, async (message) => {
    // Your processing logic here
    console.log('Received:', message.message);
    await sendEmail(message.message);
});

// 3. Publish messages
runMQ.publish('user.created', {
    userId: '123',
    email: '[email protected]',
    name: 'John Doe'
});

// That's it! The message will be delivered to all processors subscribed to 'user.created'

Event-Driven Architecture Example

One of the most powerful patterns with RunMQ is the Event Bus pattern, where multiple services independently react to the same events. The main advantage is that each service has its own isolated queue and dead letter queue, allowing for true microservices autonomy Publishing a single message (event) results in multiple services receiving and processing it independently.

Scenario: User Registration System

When a user registers, multiple services need to react independently.

import { RunMQ, RunMQMessage } from 'runmq';

interface UserCreatedEvent {
  userId: string;
  email: string;
  name: string;
  createdAt: string;
}

// Initialize RunMQ in each service
const runMQ = await RunMQ.start({
  url: 'amqp://localhost:5672'
});

// ============================================
// SERVICE 1: Email Service
// ============================================
await runMQ.process<UserCreatedEvent>('user.created', {
  name: 'emailService',        // Creates queue: emailService
  consumersCount: 2,
  attempts: 3,
  attemptsDelay: 2000
}, async (message: RunMQMessage<UserCreatedEvent>) => {
  console.log(`[Email Service] Sending welcome email to ${message.message.email}`);
  await sendWelcomeEmail(message.message);
});

// ============================================
// SERVICE 2: Analytics Service
// ============================================
await runMQ.process<UserCreatedEvent>('user.created', {
  name: 'analyticsService',    // Creates queue: analyticsService
  consumersCount: 1,
  attempts: 3
}, async (message: RunMQMessage<UserCreatedEvent>) => {
  console.log(`[Analytics] Recording user registration for ${message.message.userId}`);
  await trackUserRegistration(message.message);
});

// ============================================
// SERVICE 3: Notification Service
// ============================================
await runMQ.process<UserCreatedEvent>('user.created', {
  name: 'notificationService', // Creates queue: notificationService
  consumersCount: 3,
  attempts: 5,
  attemptsDelay: 1000
}, async (message: RunMQMessage<UserCreatedEvent>) => {
  console.log(`[Notifications] Sending push notification to ${message.message.userId}`);
  await sendPushNotification(message.message);
});

// ============================================
// PUBLISHER: User Registration Handler
// ============================================
// When a user registers, publish one event
runMQ.publish('user.created', {
  userId: 'user-123',
  email: '[email protected]',
  name: 'John Doe',
  createdAt: new Date().toISOString()
});

// All three services receive the event independently!

Adding a New Processor

Want to add a new service? Just subscribe to existing events:

// NEW SERVICE 4: CRM Sync Service
await runMQ.process<UserCreatedEvent>('user.created', {
  name: 'crmSyncService',      // Creates new isolated queue
  consumersCount: 1,
  attempts: 3
}, async (message: RunMQMessage<UserCreatedEvent>) => {
  console.log(`[CRM] Syncing user to CRM: ${message.message.userId}`);
  await syncToCRM(message.message);
});

// This new service automatically receives all future user.created events
// No changes needed to existing services!

Job Queue Pattern Example

Scenario: Background Email Processing

The other common pattern is using RunMQ as a job queue for background processing tasks. Where there's a publisher queuing jobs, and a worker service processing them asynchronously with retries and DLQ support.

import { RunMQ, RunMQMessage } from 'runmq';

interface EmailJob {
  to: string;
  subject: string;
  body: string;
  attachments?: string[];
}

const runMQ = await RunMQ.start({
  url: 'amqp://localhost:5672'
});

// ============================================
// WORKER: Email Processing Service
// ============================================
await runMQ.process<EmailJob>('email.send', {
  name: 'emailWorker',         // Single queue for job processing
  consumersCount: 5,           // 5 concurrent workers
  attempts: 3,
  attemptsDelay: 5000,
  messageSchema: {
    type: 'ajv',
    schema: {
      type: 'object',
      properties: {
        to: { type: 'string', format: 'email' },
        subject: { type: 'string' },
        body: { type: 'string' },
        attachments: { 
          type: 'array', 
          items: { type: 'string' } 
        }
      },
      required: ['to', 'subject', 'body']
    },
    failureStrategy: 'dlq'
  }
}, async (message: RunMQMessage<EmailJob>) => {
  console.log(`[Worker] Sending email to ${message.message.to}`);
  
  await sendEmail({
    to: message.message.to,
    subject: message.message.subject,
    body: message.message.body,
    attachments: message.message.attachments
  });
  
  console.log(`[Worker] Email sent successfully to ${message.message.to}`);
});

// ============================================
// PUBLISHER: API Endpoint
// ============================================
// Your API can now queue emails for background processing
app.post('/api/send-email', async (req, res) => {
  const { to, subject, body } = req.body;
  
  // Queue the job - returns immediately
  runMQ.publish('email.send', {
    to,
    subject,
    body,
    attachments: []
  });
  
  res.json({ status: 'queued' });
});

Job Processing Flow

API Request → Publish Job → Queue (emailWorker)
                              ↓
                         5 Concurrent Workers
                              ↓
                      [Success] or [Try processing for 3 times]
                              ↓
                      [Final Failure] → DLQ (emailWorker_dlq)

Features in Detail

Schema Validation

RunMQ supports JSON schema validation to ensure message integrity, so only valid messages are passed to your processors Currently, only AJV is supported for schema validation, with a single failure strategy of sending invalid messages to the DLQ in the meantime. if the schema validation fails, the message is sent directly to the DLQ without being processed.

const orderSchema = {
  type: 'object',
  properties: {
    orderId: { type: 'string', pattern: '^ORD-[0-9]+$' },
    customerId: { type: 'string' },
    items: {
      type: 'array',
      minItems: 1,
      items: {
        type: 'object',
        properties: {
          sku: { type: 'string' },
          quantity: { type: 'number', minimum: 1 },
          price: { type: 'number', minimum: 0 }
        },
        required: ['sku', 'quantity', 'price']
      }
    },
    total: { type: 'number', minimum: 0 }
  },
  required: ['orderId', 'customerId', 'items', 'total']
};

await runMQ.process('order.placed', {
  name: 'orderProcessor',
  consumersCount: 3,
  attempts: 3,
  messageSchema: {
    type: 'ajv',
    schema: orderSchema,
    failureStrategy: 'dlq'  // Invalid messages go straight to DLQ
  }
}, async (message) => {
  // Message is guaranteed to be valid
  await processOrder(message.message);
});

Queue Isolation and Naming

Important: Each processor creates an isolated queue based on its name parameter:

  • Queue name: {processor.name}
  • DLQ name: {processor.name}_dlq

This ensures:

  • ✅ Processors can't interfere with each other
  • ✅ Each processor controls its own retry logic
  • ✅ Failed messages are isolated per processor
  • ✅ Easy to monitor and debug per-processor queues

Example:

// Creates queue: userEmailService and userEmailService_dlq
await runMQ.process('user.created', { name: 'userEmailService', ... }, handler);

// Creates queue: userAnalytics and userAnalytics_dlq
await runMQ.process('user.created', { name: 'userAnalytics', ... }, handler);

Custom Logger

The default loger uses console, but you can implement your own logger by implementing the RunMQLogger interface:

import { RunMQLogger } from 'runmq';

class CustomLogger implements RunMQLogger {
  log(message: string): void {
    // Your logging implementation
  }
  
  error(message: string, error?: any): void {
    // Your error logging implementation
  }
}

const runMQ = await RunMQ.start(config, new CustomLogger());

Configuration

Connection Configuration

interface RunMQConnectionConfig {
  url: string;                    // The URL of the RabbitMQ server.
  reconnectDelay?: number;        // The delay in milliseconds before attempting to reconnect after a disconnection (default: 5000)
  maxReconnectAttempts?: number;  // Maximum reconnection attempts (default: 5)
}

Processor Configuration

interface RunMQProcessorConfiguration {
  name: string;                   //  The name of the processor, used to create isolated queues for each processor.
  consumersCount: number;         // The number of concurrent consumers to run for this processor.
  attempts?: number;             // The maximum number attempts processing a message, default is 1 attempt.
  attemptsDelay?: number;           // The delay in milliseconds between attempts.
  messageSchema?: MessageSchema; // The schema configuration for message validation.
}

Message Schema Configuration

interface MessageSchema {
  type: 'ajv';                   // The type of schema used for validation (Currently only 'ajv').
  schema: any;                   // The schema definition of the chosen schemaType, used for validating messages.
  failureStrategy: 'dlq';        // The strategy to apply when schema validation fails (e.g., 'dlq').
}

Message Structure

interface RunMQMessageContent<T> {
  message: T;                    // Your message payload
  meta: {
    id: string;                  // The unique identifier of the message.
    publishedAt: number;         // The timestamp when the message was published.
    correlationId: string;       // The correlation identifier.
  }
}

License

MIT

About

RunMQ is a reliable message queue library for Node.js built on top of RabbitMQ Guarantees

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published