# @memberjunction/queue
A server-side queue management framework for MemberJunction applications that provides database-backed task persistence, concurrent processing, and heartbeat monitoring.
## Overview
The `@memberjunction/queue` package delivers a robust queuing system for background task processing. It manages task lifecycle from creation through execution, with automatic queue provisioning, configurable concurrency limits, and process-level health tracking.
```mermaid
graph TD
A["QueueManager
(Singleton)"] --> B["QueueBase
(Abstract)"]
B --> C["AIActionQueue"]
B --> D["EntityAIActionQueue"]
B --> E["Custom Queue
(Your Implementation)"]
A --> F["Queue Types
(Database Metadata)"]
A --> G["Queue Records
(Process Tracking)"]
B --> H["TaskBase
(Individual Tasks)"]
style A fill:#2d6a9f,stroke:#1a4971,color:#fff
style B fill:#7c5295,stroke:#563a6b,color:#fff
style C fill:#2d8659,stroke:#1a5c3a,color:#fff
style D fill:#2d8659,stroke:#1a5c3a,color:#fff
style E fill:#b8762f,stroke:#8a5722,color:#fff
style F fill:#2d6a9f,stroke:#1a4971,color:#fff
style G fill:#2d6a9f,stroke:#1a4971,color:#fff
style H fill:#7c5295,stroke:#563a6b,color:#fff
```
## Installation
```bash
npm install @memberjunction/queue
```
## Architecture
### Task Lifecycle
```mermaid
stateDiagram-v2
[*] --> Pending: Task Created
Pending --> InProgress: Queue Picks Up
InProgress --> Complete: ProcessTask Succeeds
InProgress --> Failed: ProcessTask Fails
Pending --> Cancelled: External Cancel
```
### Processing Flow
```mermaid
sequenceDiagram
participant Client
participant QM as QueueManager
participant QB as QueueBase
participant DB as Database
Client->>QM: AddTask(type, data, options)
QM->>QM: Find or create queue for type
QM->>DB: Save QueueTask record (Pending)
QM->>QB: AddTask(taskBase)
QB->>QB: ProcessTasks() loop (250ms interval)
QB->>QB: Check concurrency (max 3 tasks)
QB->>QB: StartTask(task)
QB->>QB: ProcessTask(task) [abstract]
QB->>DB: Update QueueTask status
QB-->>Client: TaskResult
```
## Core Components
### QueueManager
The `QueueManager` is a singleton that coordinates all active queues. It auto-creates queue instances per type and captures process-level metadata (PID, hostname, network interfaces) for monitoring.
```typescript
import { QueueManager } from '@memberjunction/queue';
// Initialize (typically at application startup)
await QueueManager.Config(contextUser);
// Add a task by queue type name
const task = await QueueManager.AddTask(
'Email Notification',
{ recipient: 'user@example.com', subject: 'Welcome' },
{ priority: 1 },
contextUser
);
if (task) {
console.log(`Task created: ${task.ID}`);
}
```
### QueueBase
Abstract base class for all queue implementations. Subclasses implement `ProcessTask()` to define task execution logic.
```typescript
import { QueueBase, TaskBase, TaskResult } from '@memberjunction/queue';
import { RegisterClass } from '@memberjunction/global';
import { UserInfo } from '@memberjunction/core';
@RegisterClass(QueueBase, 'Email Notification')
export class EmailNotificationQueue extends QueueBase {
protected async ProcessTask(
task: TaskBase,
contextUser: UserInfo
): Promise {
const { recipient, subject, body } = task.Data;
await sendEmail(recipient, subject, body);
return {
success: true,
userMessage: 'Email sent successfully',
output: { sentAt: new Date() },
exception: null
};
}
}
```
### TaskBase
Represents an individual task with its payload, options, and database-backed record.
| Property | Type | Description |
|----------|------|-------------|
| `ID` | `string` | Unique task identifier from database |
| `Status` | `TaskStatus` | Current status (Pending, InProgress, Complete, Failed, Cancelled) |
| `Data` | `object` | Task payload data |
| `Options` | `TaskOptions` | Configuration (e.g., priority) |
| `TaskRecord` | `QueueTaskEntity` | Underlying database entity |
### TaskResult
Returned by `ProcessTask()` to communicate outcome.
| Property | Type | Description |
|----------|------|-------------|
| `success` | `boolean` | Whether the task completed successfully |
| `userMessage` | `string` | Human-readable result message |
| `output` | `object` | Task output data |
| `exception` | `object` | Error details if failed |
## Built-in Queues
### AIActionQueue
Processes AI actions through the MemberJunction AI Engine.
```typescript
const task = await QueueManager.AddTask(
'AI Action',
{ actionName: 'GenerateText', prompt: 'Summarize this document' },
{},
contextUser
);
```
### EntityAIActionQueue
Processes entity-specific AI actions.
```typescript
const task = await QueueManager.AddTask(
'Entity AI Action',
{ entityName: 'Products', entityID: '123', actionName: 'GenerateDescription' },
{},
contextUser
);
```
## Configuration
Queue behavior is controlled through constructor parameters:
| Parameter | Default | Description |
|-----------|---------|-------------|
| `_maxTasks` | `3` | Maximum concurrent tasks per queue |
| `_checkInterval` | `250` | Polling interval in milliseconds |
## Database Schema
The queue system persists state across three tables:
| Table | Purpose |
|-------|---------|
| `__mj.QueueType` | Defines available queue types |
| `__mj.Queue` | Tracks active queue instances with process info and heartbeat |
| `__mj.QueueTask` | Stores individual tasks with status, data, and output |
## Dependencies
| Package | Purpose |
|---------|---------|
| `@memberjunction/core` | Entity management and metadata |
| `@memberjunction/global` | Class registration and global state |
| `@memberjunction/core-entities` | Queue and task entity types |
| `@memberjunction/ai` | AI functionality for built-in queues |
| `@memberjunction/aiengine` | AI Engine integration |
## License
ISC