# Job Queue System
## Overview
The Workglow job queue system (`@workglow/job-queue`) provides a robust, three-tier architecture for scheduling, executing, and monitoring asynchronous work. It is the backbone that drives AI task execution, embedding generation, and any long-running computation within the framework. The design separates concerns into three cooperating layers -- **Client**, **Server**, and **Worker** -- so that the same code can operate in-process with direct event forwarding or across process boundaries through storage-backed subscriptions.
Every job passes through a well-defined lifecycle (`PENDING -> PROCESSING -> COMPLETED | FAILED | DISABLED`) with built-in retry logic, progress reporting, abort handling, and pluggable rate limiting. The queue persists its state through the `IQueueStorage` interface, which has concrete implementations for SQLite, PostgreSQL, Supabase, IndexedDB, and in-memory backends.
## Three-Tier Architecture
### Tier 1: JobQueueClient
The client is the public API surface that application code interacts with. It is responsible for submitting jobs, waiting for results, subscribing to progress updates, and aborting running work.
A client can operate in two modes:
1. **Attached (in-process):** The client calls `attach(server)` to register itself with a local `JobQueueServer`. Events flow directly from server to client through in-memory function calls, providing the lowest possible latency.
2. **Connected (cross-process):** The client calls `connect()` to subscribe to storage change notifications. This mode works when the server runs in a different process, a different machine, or even a serverless function -- any topology where both sides share the same backing storage.
```typescript
import { JobQueueClient } from "@workglow/job-queue";
const client = new JobQueueClient({
storage: queueStorage,
queueName: "embeddings",
});
// In-process: attach directly to server
client.attach(server);
// Cross-process: connect via storage subscriptions
client.connect();
```
### Tier 2: JobQueueServer
The server is the coordinator. It owns a pool of workers, manages their lifecycle, aggregates statistics, and handles housekeeping tasks such as stuck-job recovery and TTL-based cleanup. When a server starts it:
1. Fixes up orphaned jobs from previous runs (jobs stuck in PROCESSING or ABORTING that are not owned by any current worker).
2. Subscribes to storage change events so it can wake idle workers the moment new work arrives.
3. Starts all workers and begins the periodic cleanup loop.
The server also acts as an event bus, forwarding worker-level events to all attached clients.
```typescript
import { JobQueueServer } from "@workglow/job-queue";
const server = new JobQueueServer(MyJob, {
storage: queueStorage,
queueName: "embeddings",
limiter: concurrencyLimiter,
workerCount: 4,
pollIntervalMs: 100,
deleteAfterCompletionMs: 60_000,
cleanupIntervalMs: 10_000,
});
await server.start();
```
### Tier 3: JobQueueWorker
Workers are the execution engines. Each worker runs a tight `while` loop that checks the limiter, claims the next available job from storage via `storage.next(workerId)`, and dispatches it for execution. When no work is available, the worker sleeps until either a `notify()` call wakes it (pushed by the server when new work arrives or a slot frees up) or the poll interval expires as a fallback.
Workers process jobs concurrently within themselves. The `processSingleJob` call is not awaited in the main loop, allowing the worker to pick up another job on the next iteration -- subject to limiter approval.
Each worker has a unique `workerId` (UUID by default, or a caller-provided persistent ID). This ID is written into the job record when the worker claims it, enabling the server to distinguish orphaned jobs from jobs actively being processed.
## The Job Class
`Job` is the base class for all work units. Subclasses override the `execute` method to implement their logic.
```typescript
import { Job } from "@workglow/job-queue";
import type { IJobExecuteContext } from "@workglow/job-queue";
class EmbeddingJob extends Job {
async execute(input: EmbeddingInput, context: IJobExecuteContext): Promise {
// Check for cancellation
if (context.signal.aborted) {
throw new AbortSignalJobError("Aborted");
}
// Report progress
await context.updateProgress(50, "Generating embeddings...");
const result = await generateEmbeddings(input.text);
await context.updateProgress(100, "Done");
return { vectors: result };
}
}
```
### Key Job Properties
| Property | Type | Description |
|---|---|---|
| `id` | `unknown` | Storage-assigned primary key |
| `jobRunId` | `string` | Groups related jobs into a single run |
| `queueName` | `string` | Name of the queue this job belongs to |
| `input` | `Input` | Serializable input payload |
| `output` | `Output \| null` | Result after completion |
| `status` | `JobStatus` | Current lifecycle state |
| `fingerprint` | `string` | Optional deduplication key |
| `maxRetries` | `number` | Maximum retry attempts (default: 10) |
| `runAfter` | `Date` | Earliest time the job may execute |
| `deadlineAt` | `Date \| null` | Hard deadline; job fails if exceeded |
| `runAttempts` | `number` | Number of times the job has been attempted |
| `progress` | `number` | Progress percentage (0-100) |
| `progressMessage` | `string` | Human-readable progress description |
| `workerId` | `string \| null` | ID of the worker that claimed this job |
### IJobExecuteContext
The execution context passed to every `execute` call provides:
- **`signal: AbortSignal`** -- An abort signal that fires when the client calls `abort()` or the worker shuts down. Job implementations should check this signal periodically.
- **`updateProgress(progress, message?, details?)`** -- Async function that persists progress to storage and notifies listeners. Progress values are clamped to 0-100.
## JobHandle
When you submit a job, the client returns a `JobHandle