# PgQue -- PgQ Universal Edition - **Version:** 1.1 - **Date:** 2026-04-12 - **License:** Apache-2.0 - **Status:** Approved — implementation-ready - **Companion:** SPEC.md (v0.7.0-draft) contains the deep architectural analysis of PgQ internals -- rotation mechanics, snapshot isolation, batch_event_sql algorithm, dual-filter optimization, INHERITS justification, rotation state machine, subtransaction caveats, tick cleanup invariants. This document references SPEC.md for those topics rather than duplicating them. **Two-layer architecture:** pgque is explicitly structured as two layers: - **pgque-core:** Productization of PgQ (rename, modernize, pg_cron, security hardening, single-file install, health/metrics views). Low risk -- mechanical transformation of proven code. - **pgque-api:** Modern convenience layer (send/receive/ack/nack, DLQ, delayed delivery, client SDKs). Higher risk -- new semantic surface area that must reduce cleanly to PgQ primitives. ## Changelog | Version | Date | Description | |---------|------|-------------| | 0.1.0-draft | 2026-04-12 | Initial draft: repackaging thesis, what changes from PgQ, modern API layer, observability, client libraries, advanced patterns, implementation phases, source file inventory. | | 0.2.0-draft | 2026-04-12 | Landscape comparison (28 systems across PG-native, external brokers, workflow engines, Python task queues; architectural comparison table; positioning rationale). Team/staffing with week-by-week Gantt. Risk table (11 risks). Best practices. Sprint-level implementation plan with deliverables and test plans. | | 0.3.0-draft | 2026-04-12 | First review round. Rename pgqx to pgque (PgQ Universal Edition). Explicit two-layer architecture (pgque-core vs pgque-api). Fix receive() batch ownership trap (rename i_batch_size to i_max_return, document that ack processes entire batch). Fix nack() to accept retry_count parameter (avoid re-querying batch). Fix send_batch() to resolve queue/table once. Fix OTel counter/gauge semantics. Fix queue_health() edge cases. Soften "Exactly-once capable" to "Exactly-once capable (transactional pattern)". Resolve priority contradiction. Add VACUUM for delayed_events and dead_letter to maint(). Defer full OTel export architecture and Node/Ruby SDKs to v2. Align Sprint 5 with risk mitigation (Python + Go only). Document receive() rotation-blocking behavior. | | 0.4.0-draft | 2026-04-12 | Second review round (3 reviewers). Add preliminary benchmark results (section 2.9, from NikolayS/pgq#1 -- quick-and-dirty laptop benchmark, needs repetition on server hardware). Update throughput claim from ~10-20k to ~86k ev/s (PL/pgSQL measured). Add PgQ code import strategy: git submodule + build/transform.sh (section 8.0). Fix `event_dead()` to accept event fields from caller instead of re-querying batch. Remove dead `qstate` lookup from `send_batch()`, leave TODO. Read `max_retries` from queue config instead of hardcoding 5 in `nack()`. Drop `peek()` from v1 scope. Fix `delayed_events` index (remove broken partial-index predicate with `now()`). Rename `send_at()` return type documentation to clarify it returns a scheduled-entry ID, not a queue event ID. Fix Node.js/Ruby class names (PgqxClient -> PgqueClient, Pgqx:: -> Pgque::). Fix CLI env var (PGQX_DSN -> PGQUE_DSN). Align Gantt with v1 scope (remove Node.js/Ruby from weeks 7-8). Add `queue_max_retries` column to `pgque.queue` table. Fix `queue_health()` to handle queues with no ticks. | | 0.5.0-draft | 2026-04-12 | Third review round (3 approvals). Fix section 2.7 throughput claim (was still ~10-20k, now reflects benchmarks with `synchronous_commit=off` caveat). Add `sync_commit=off` caveat to section 2.5 comparison table. Combine `nack()` two lookups into single join. Add `queue_max_retries` column to schema definition (section 3.4.6.1). Document `ev_txid` NULL in DLQ (pgque.message does not carry txid). Correct RedPanda comparison units (MiB/s not Mbps). | | 0.6.0-draft | 2026-04-12 | Add red/green TDD methodology (section 13.2). Add 10 user stories as acceptance tests (section 13.3): basic produce/consume, fan-out, retry+DLQ, delayed delivery, batch under load, rotation under lag, transactional exactly-once, managed PG install, observability, idempotent install. Tests serve both CI automation and manual/AI-agent verification. | | 1.0.0 | 2026-04-12 | Spec approved. Three independent review rounds, all reviewers approved. | | 1.1 | 2026-04-12 | Clarify US-3 retry cycle sequencing, unit vs integration test distinction in TDD section, US-10 idempotency implementation challenges. | --- ## 1. What pgque Is pgque is a repackaging of [PgQ](https://github.com/pgq/pgq) (v3.5.1, ISC license) into a modern, extension-free PostgreSQL queue system with a simplified API and built-in observability. **pgque is NOT a reimplementation.** PgQ already ships complete PL/pgSQL replacements for all its C code in `lowlevel_pl/`. The Makefile has a `make plonly` target that produces `pgq_pl_only.sql` -- a single concatenated file. There is a test (`sql/switch_plonly.sql`) that hot-swaps C for PL/pgSQL at runtime. The PL-only install (`structure/install_pl.sql`) is six lines: ``` \i structure/tables.sql \i structure/func_internal.sql \i lowlevel_pl/insert_event.sql \i structure/func_public.sql \i structure/triggers_pl.sql \i structure/grants.sql ``` The substitution is exactly two components: | C component | PL/pgSQL replacement | Lines | |---|---|---| | `lowlevel/pgq_lowlevel.sql` (C shared lib for event insertion) | `lowlevel_pl/insert_event.sql` | 60 | | `structure/triggers.sql` (C trigger lib) | `lowlevel_pl/jsontriga.sql`, `logutriga.sql`, `sqltriga.sql` | 318 + 326 + 363 = 1,007 | Everything else -- tables (225 lines), internal functions (32 include lines expanding to ~1,300 lines of function bodies), public functions (70 include lines expanding to ~1,600 lines), grants (13 lines) -- is IDENTICAL between C and PL-only installs. Total PL-only source: ~4,028 lines across 40 files. **pgque IS a productization.** It takes this proven, tested code and: 1. Repackages it as a single-file install (no `make`, no `CREATE EXTENSION`) 2. Renames to `pgque` schema (coexists with original PgQ) 3. Modernizes for PG14+ (`pg_snapshot` functions, `xid8` type) 4. Replaces `pgqd` daemon with `pg_cron` jobs 5. Adds a modern API layer (`send`/`receive`/`ack`/`nack`, DLQ, delayed delivery) 6. Adds observability (metrics views, OTel integration, health diagnostics) 7. Provides native client libraries for Python, Go, Node.js, Ruby The positioning: "PgQ already works in pure SQL -- we packaged it so you can use it on managed databases, and added modern conveniences." Installation: `\i pgque.sql` followed by `SELECT pgque.start()`. ### License PgQ is ISC-licensed (copyright Marko Kreen, Skype Technologies OU). ISC is a permissive license functionally equivalent to MIT/BSD-2-Clause. pgque is licensed under **Apache-2.0**. The ISC license requires preserving the copyright notice and permission notice in all copies — pgque includes PgQ's original copyright notice in its source headers. --- ## 2. Landscape Comparison This section surveys the queue, job, and durable-execution ecosystem that pgque enters. The goal is not to declare winners but to make the architectural trade-offs visible so that a reviewer (or a prospective user) can judge where pgque fits and where it does not. ### 2.1 PostgreSQL-native queue systems These systems run entirely inside PostgreSQL (or require only a PG extension). They are the most direct competitors to pgque. | System | Stars | Language | Architecture | Key trait | |---|---|---|---|---| | **PgQ** (pgtools) | ~300 | PL/pgSQL + C | Snapshot-based batch isolation, TRUNCATE rotation, 3-table INHERITS | pgque's foundation — battle-tested at Skype scale for 15+ years | | **PGMQ** (Tembo) | ~4.8k | Rust ext + SQL | SKIP LOCKED + DELETE, visibility timeout, per-message ack. Also has SQL-only install. | SQS-like API. Dead tuple bloat under sustained load. | | **River** | ~5k | Go | SKIP LOCKED, transactional enqueue, LISTEN/NOTIFY, unique jobs, COPY FROM for bulk | Go-native, excellent DX, ~46k jobs/sec. Go-only. | | **graphile-worker** | ~2.2k | Node.js | SKIP LOCKED, LISTEN/NOTIFY, sub-3ms latency, batch jobs, cron | Fastest PG queue (~184k jobs/sec with batching). Node.js only. | | **pg-boss** | ~3.4k | Node.js | SKIP LOCKED, polling, priority, scheduling, throttling, DLQ, partitioned archival | Feature-rich all-in-one. Node.js only. | | **Oban** | ~3.9k | Elixir | SKIP LOCKED, LISTEN/NOTIFY, unique jobs, priorities 0-9. Pro: DAG workflows, rate limiting. | Elixir ecosystem standard. Pro is commercial. | | **Que** | ~2.3k | Ruby | Advisory locks (not SKIP LOCKED) — locks held in memory, no dead tuples from claiming | Avoids bloat via advisory locks. ~9.8k jobs/sec. Thin codebase (~1,200 lines). | | **good_job** | ~3k | Ruby | Advisory locks via CTE, multi-threaded, cron, concurrency controls, dashboard | Most popular Ruby PG queue. CTE degrades above ~1M queued jobs. | | **solid_queue** | ~2.4k | Ruby | FOR UPDATE SKIP LOCKED, separate hot/scheduled tables, supervisor architecture | Rails 8 default. Backed by 37signals production (HEY, Basecamp). ~20M jobs/day. | | **Delayed::Job** | ~4.8k | Ruby | Polling (no SKIP LOCKED), YAML serialization, priority | Legacy (~376 jobs/sec). Still widely deployed. | | **QueueClassic** | ~1.2k | Ruby | LISTEN/NOTIFY + SKIP LOCKED, JSON payloads | Simple, transactional enqueue. Lower adoption than alternatives. | | **Procrastinate** | ~0.7k | Python | SKIP LOCKED + LISTEN/NOTIFY, pure PG, Django integration | Small but notable: pure-PG Python job queue. | ### 2.2 External broker systems (commonly compared) Teams evaluating PostgreSQL-native queues often come from these systems, or are deciding whether to add a separate broker. Understanding what they trade away (and gain) is essential context. | System | Stars | Backend | Architecture | Why teams consider PG queues instead | |---|---|---|---|---| | **Sidekiq** | ~13.5k | Redis | In-memory, multi-threaded Ruby workers, ~50k jobs/sec | No transactional enqueue with PG, Redis is another dependency | | **Celery** | ~28k | Redis / RabbitMQ | Python distributed task queue, chains/groups/chords | Complex ops (broker + result backend), PG backend poorly maintained | | **BullMQ** | ~8.7k | Redis | Node.js, Redis Streams, rate limiting, flows, multi-language clients | Redis dependency, no transactional enqueue | | **Faktory** | ~6.1k | Redis | Language-agnostic Go server, any-language workers (by Sidekiq author) | Extra server + Redis, two dependencies | The common thread: teams adopt PG-native queues to eliminate the Redis (or RabbitMQ) dependency and to get **transactional enqueue** — the ability to insert an event and write application state in the same COMMIT, with rollback guaranteeing neither persists. No external broker can offer this. ### 2.3 Workflow / durable execution engines (different category) These are not queues. They solve a fundamentally different problem — multi-step, stateful, long-running processes with durable execution guarantees. They appear in queue comparisons because teams sometimes conflate "I need a queue" with "I need a workflow engine." | System | Stars | Architecture | Why it is different | When to use pgque instead | |---|---|---|---|---| | **Temporal.io** | ~19.5k | Go server cluster, event sourcing, deterministic replay, SDKs in 6+ languages | Full workflow orchestration. Requires Temporal server + DB (Cassandra/MySQL/PG). Python SDK alone is ~170k lines. A team reported Temporal became a "barrier to adoption by Enterprise customers" due to operational burden (Nango migrated to a simple PG queue). | When you need a queue, not a workflow engine. pgque is one SQL file; Temporal is a distributed system. | | **Restate.dev** | ~3.7k | Rust server, virtual objects, durable execution, custom replicated log | State machines with durable execution. Requires Restate server. More like "durable functions" than a queue. Lower ops burden than Temporal but still an external service. | When your workload is queue-shaped (produce event, consume event), not workflow-shaped (multi-step stateful processes). | | **Inngest** | ~5.2k | Go server, event-driven step functions, serverless-compatible (Vercel, Netlify) | Event-triggered durable workflows. Can run serverless or self-hosted. Event-centric model (triggers, not just queues). | When you need simple event streaming, not step-function orchestration. | | **Hatchet** | ~6.8k | Go server on PostgreSQL, DAG workflows, multi-tenant queue fairness, YC W24 | PG-native workflow engine with queue semantics. Bridges simple queues and full workflow engines. Dashboard, CLI, alerting built in. | When you need a queue primitive, not a workflow platform. Hatchet is closer to pgque's PG-native philosophy but adds significant complexity. | | **Rivet.gg** | ~5.4k | Rust platform, actors + workflows, FoundationDB + V8 isolates | Platform for durable applications, actors, matchmaking. Gaming/real-time focus. Durable Objects model. | When you just need a PG queue, not an actor platform. | | **Absurd** | ~1.6k | Pure PG (PL/pgSQL stored procedures), no extensions, pull-based scheduling | "Temporal but just Postgres." By Armin Ronacher (Flask, Rye). Single SQL file, tiny SDKs (~1.4k-1.9k LOC). Checkpointed step execution with resume on failure. Architecturally closest to pgque's PG-native philosophy in the workflow space. | If you need simple produce/consume, pgque. If you need step-by-step workflows with per-step checkpointing, consider Absurd. Both prove that PG-native, no-extension approaches work. | ### 2.4 Python-specific task systems Python has a rich ecosystem of task/job systems, almost all Redis-backed. Teams running PostgreSQL-heavy Python stacks often evaluate whether they can eliminate Redis entirely. | System | Stars | Backend | Notes | |---|---|---|---| | **Dramatiq** | ~5.2k | Redis / RabbitMQ | Celery alternative, simpler API, better defaults. LGPL licensed. | | **Huey** | ~5.9k | Redis / SQLite / in-memory | Lightweight, multiple backends, good for smaller projects. No PG backend. | | **RQ (Redis Queue)** | ~10.6k | Redis | Extremely simple. Fork-per-job (~10x slower than Dramatiq/Huey). | | **ARQ** | ~2.9k | Redis | Async-native (asyncio), by Pydantic creator. | | **SAQ** | ~0.5k | Redis | ARQ-inspired, Redis Streams, sub-5ms latency. | | **TaskTiger** | ~1.3k | Redis | Unique tasks, scheduled tasks, reliable locking. | pgque with `pgque-py` (section 6.2) gives Python teams a PG-native alternative that requires no Redis, supports transactional enqueue, and avoids MVCC bloat. ### 2.5 Key architectural comparison This table compares pgque against the most-adopted PostgreSQL-native queue systems across the features that matter for production operations. | Feature | pgque | PGMQ | River | graphile-worker | pg-boss | Oban | solid_queue | |---|---|---|---|---|---|---|---| | Claim mechanism | Snapshot isolation (lockless) | SKIP LOCKED | SKIP LOCKED | SKIP LOCKED | SKIP LOCKED | SKIP LOCKED | FOR UPDATE + SKIP LOCKED | | Table bloat under sustained load | None (TRUNCATE rotation) | Yes (DELETE + VACUUM) | Yes (UPDATE/DELETE) | Yes (UPDATE/DELETE) | Mitigated (partitioned archival) | Yes (UPDATE/DELETE) | Yes (DELETE) | | Delivery semantics | At-least-once (batch) | At-least-once (per-msg) | At-least-once | At-least-once | At-least-once | At-least-once | At-least-once | | Exactly-once capable | Yes (transactional ack pattern -- see 7.1) | No | Yes (transactional) | No | No | No | No | | Batch processing | Native (tick-bounded) | Manual | Manual | Manual batching | No | No | No | | DLQ | Built-in (v1) | Via archival | No | No | Built-in | Via plugin | Via Solid Queue API | | CDC triggers | jsontriga (v1) | No | No | No | No | No | No | | Multiple consumers | Built-in | Manual | No | No | No | Via queues | Via queues | | Requires C extension | No | Yes (Rust ext) | No (Go binary) | No | No | No | No | | Language-agnostic | Yes (SQL API) | Yes (SQL API) | Go only | Node.js only | Node.js only | Elixir only | Ruby only | | Managed PG compatible | Yes | Depends (needs ext) | Yes | Yes | Yes | Yes | Yes | | Latency (typical) | 1-2s (tick interval) | Sub-100ms | Sub-100ms | Sub-3ms | ~1s | ~1s | ~1s | | Throughput | ~86k ev/s single-TX, ~164k batched (PL/pgSQL, `sync_commit=off` per-session; see 2.9) | ~30k msg/sec read | ~46k jobs/sec | ~184k jobs/sec | ~10k jobs/sec | ~20k jobs/sec | Not published | | Battle-tested | 15+ years (Skype/MS) | ~2 years | ~2 years | ~5 years | ~5 years | ~5 years | ~1 year | **Reading the throughput column:** These numbers come from different benchmarks on different hardware and are not directly comparable. They indicate order of magnitude. pgque's lower raw throughput is offset by zero maintenance overhead under sustained load — the systems that benchmark higher in bursts degrade as dead tuples accumulate (see the Brandur/PlanetScale MVCC analysis in SPEC.md section 10.2). ### 2.6 Why pgque Given the landscape, pgque's value proposition rests on six pillars: 1. **Zero bloat under sustained load.** pgque is the only PostgreSQL queue system that uses TRUNCATE rotation instead of DELETE for event lifecycle. Every SKIP LOCKED queue eventually hits the Brandur/PlanetScale dead tuple wall — where a single long-running transaction pins the MVCC horizon and causes index scan degradation, job backlog growth, and a positive feedback loop that only manual VACUUM or downtime resolves. pgque is structurally immune. Events are never individually DELETEd. Entire tables are cleared via TRUNCATE (DDL, not DML — no dead tuples, no MVCC visibility checks). 2. **Language-agnostic.** pgque's API is pure SQL. Any language that can execute `SELECT pgque.send(...)` can produce events. Any language that can execute `SELECT * FROM pgque.receive(...)` can consume them. Client libraries (Python, Go, Node.js, Ruby) are convenience wrappers, not requirements. This is unique among modern PG queues — River is Go-only, graphile-worker is Node.js-only, Oban is Elixir-only, solid_queue is Ruby-only. 3. **No extra infrastructure.** One SQL file. No Redis, no separate server process, no C extension to compile, no `shared_preload_libraries`, no server restart. `\i pgque.sql` and you have a production queue. pg_cron (pre-installed on every major managed provider) handles the ticker. 4. **Battle-tested core.** PgQ has run at Skype/Microsoft scale for 15+ years, processing billions of events. pgque is not a prototype or a weekend project — it is a repackaging of proven code with modern conveniences added on top. 5. **Managed PG compatible.** Runs on RDS, Aurora, AlloyDB, Cloud SQL, Supabase, Neon, Crunchy Bridge without modifications. No extension installation required. No DBA needed. 6. **Batch-oriented.** Natural fit for ETL, CDC, analytics pipelines, and any workload where processing N events together is more efficient than processing them one at a time. Tick-bounded batches give consumers a consistent, snapshot-isolated set of events to work with. ### 2.7 When NOT to use pgque pgque is not the right choice for every queue workload. Be honest about the trade-offs: - **Sub-10ms latency requirements.** pgque's tick-based architecture means typical latency is 1-2 seconds (reducible to ~100ms with LISTEN/NOTIFY wakeup). If you need sub-10ms job dispatch, graphile-worker or direct LISTEN/NOTIFY is a better fit. - **100k+ jobs/sec sustained throughput.** Preliminary benchmarks (section 2.9) show ~86k ev/s with `synchronous_commit=off` (settable per-session or per-transaction — safe for queue workloads even when the global setting is `on`, since at worst the last few ms of committed events are lost on crash). With `synchronous_commit=on`, expect lower numbers. If you need 100k+ jobs/sec sustained, you likely need a dedicated broker (Kafka, RedPanda) or a C-level extension. - **Complex multi-step workflows with branching.** If your workload is "step A then step B, but if B fails retry B three times, then escalate to step C" — that is a workflow engine problem, not a queue problem. Use Temporal, Restate, or Absurd. - **You are already all-in on one ecosystem.** If your team is pure Go, River gives you better DX. Pure Elixir, Oban is the standard. Rails 8, solid_queue is built in. pgque's value is strongest when you have a polyglot stack or when you need the zero-bloat guarantee. ### 2.8 Architectural trade-off summary Three fundamentally different approaches exist for PG-native job claiming: **SKIP LOCKED systems** (PGMQ, River, graphile-worker, pg-boss, Oban, solid_queue): Fast per-job claiming, but every completed job creates a dead tuple via UPDATE or DELETE. Under sustained high throughput, VACUUM cannot keep up, index scans degrade, and throughput enters a death spiral. This is the core problem documented by Brandur Leach (2015) and validated by PlanetScale (2026) — see SPEC.md section 10.2. **Advisory lock systems** (Que, good_job): Avoid the dead-tuple problem — locks are held in memory, no row UPDATE needed to claim a job. But advisory lock tables have their own contention limits at extreme concurrency, and CTE-based lock acquisition degrades above ~1M queued jobs. **Snapshot-based batch isolation + TRUNCATE rotation** (PgQ, pgque): Zero bloat by design. No per-job locking, no dead tuples, no VACUUM pressure on event tables. The trade-off is batch-oriented consumption (not per-job) and ticker-driven latency (1-2 seconds, not sub-3ms). ### 2.9 Preliminary benchmark results A quick-and-dirty benchmark was run on a laptop (Apple Silicon, 10 cores, 24 GiB RAM, APFS SSD, PostgreSQL 18.3). **These numbers are preliminary and will need to be repeated on proper server hardware with controlled conditions.** Full details, methodology, and raw data: [NikolayS/pgq#1](https://github.com/NikolayS/pgq/issues/1). Key findings (PgQ v3.5.1, tuned config: `synchronous_commit=off` — can be set per-session/per-TX for queue workloads only; `shared_buffers=4GB`, `max_wal_size=8GB`, `wal_level=minimal`): | Scenario | Throughput | Per core | |---|---|---| | C mode, single insert/TX, ~100 B, 16 clients | 117,924 ev/s | ~11.8k ev/s | | **PL/pgSQL mode, single insert/TX, ~100 B, 16 clients** | **85,836 ev/s** | **~8.6k ev/s** | | C mode, batched 1000/TX, ~100 B, 16 clients | 417,414 ev/s | ~41.7k ev/s | | C mode, batched 1000/TX, ~2 KiB, 16 clients | 257,179 ev/s (479 MiB/s) | ~25.7k ev/s (~47.9 MiB/s) | | C mode, batched 1000/TX, 30-min sustained, ~2 KiB (70 ckpts) | 163,940 ev/s (301 MiB/s avg) | ~16.4k ev/s (~30.1 MiB/s) | | **PL/pgSQL mode, batched 100k/TX, ~100 B, 1 client** | **80,515 ev/s** | **~8.1k ev/s** | | **PL/pgSQL mode, batched 100k/TX, ~2 KiB, 1 client** | **48,899 ev/s (~91.5 MiB/s)** | **~4.9k ev/s (~9.2 MiB/s)** | | Consumer read rate, 100k batch, ~100 B | ~2.4M ev/s | ~240k ev/s | | Consumer read rate, 100k batch, ~2 KiB | ~305k ev/s (568 MiB/s) | ~30.5k ev/s (~56.8 MiB/s) | Per-core numbers assume all 10 cores are utilized (Apple Silicon, mixed P/E cores). Actual per-core throughput on server hardware with uniform cores may differ. These per-core figures enable direct comparison with systems like RedPanda (~100 MiB/s per core claimed). PgQ sustained ~30.1 MiB/s per core — roughly **1/3 of RedPanda's per-core claim**, but with full ACID transactions, transactional batch isolation, and zero bloat under sustained load. The ~3x gap is far from the "1-2 orders of magnitude" sometimes claimed for PG-based queues vs. dedicated brokers. On server-grade NVMe (where I/O was 57% of wait time on this laptop SSD), the gap would narrow further. The PL/pgSQL rows are the most relevant for pgque — they show the throughput ceiling for the no-C-extension mode that pgque will use. At ~8.6k ev/s per core for single-insert-per-TX, PgQ's PL/pgSQL mode is competitive with C-based alternatives, especially considering that it produces zero dead tuples under sustained load. Notable observations from the benchmark: - **Tuning matters more than C vs. PL/pgSQL.** PL/pgSQL tuned (86k ev/s) beats C untuned (52k ev/s). - **Batching matters most.** 1000 inserts/TX reaches 417k ev/s — 3.6x over single-insert-per-TX. - **Consumer is never the bottleneck.** Reading events is 3-6x faster than writing them. - **Checkpoints cause dips but not collapse.** Sustained throughput over 70 checkpoints (30 min) averaged 301 MiB/s with no degradation over time. - **Storage is the bottleneck.** pg_ash showed 57% of time spent on `IO:DataFileWrite` — on server-grade NVMe, throughput would scale higher. **Caveat:** This is a quick-and-dirty benchmark on a laptop. The numbers are indicative, not authoritative. A proper benchmark must be run on server hardware with controlled conditions, multiple runs, and statistical analysis. See Sprint 6 for the planned benchmark methodology. --- ## 3. Why ### 3.1 The managed-database problem PgQ requires installing two custom C shared libraries (`pgq_lowlevel.so`, `pgq_triggers.so`) and an external daemon (`pgqd`). This makes it unusable on: - **Amazon RDS / Aurora** — no custom C extensions - **Google Cloud SQL** — no custom C extensions - **AlloyDB** — no custom C extensions - **Azure Flexible Server** — limited extension allowlist, pg_cron permissions more constrained - **Supabase, Neon, Crunchy Bridge** — curated extension catalogs - **Any environment** where the DBA cannot (or will not) install C code These are now the majority of PostgreSQL deployments. PgQ's architecture — designed in the Skype era when everyone ran self-hosted Postgres — locks it out of the modern ecosystem. **pg_cron availability by provider:** | Provider | pg_cron available | Notes | |----------|------------------|-------| | Amazon RDS / Aurora | Yes | Supported since RDS PG 12.5 | | Google Cloud SQL | Yes | Supported, requires flag | | AlloyDB | Yes | Supported | | Azure Flexible Server | Yes | More constrained permissions model | | Supabase | Yes | Pre-installed | | Neon | Yes | Jobs only run when compute is active (no scale-to-zero) | | Crunchy Bridge | Yes | Supported | | Self-hosted | Yes | Must install separately | Where pg_cron is unavailable or unsuitable (e.g., serverless scale-to-zero), pgque works with any external scheduler calling `pgque.ticker()` and `pgque.maint()` via `psql` or a database connection. ### 3.2 The daemon problem PgQ requires `pgqd`, an external C daemon, to generate ticks and run maintenance. This means: - Another process to deploy, monitor, and restart - Another failure mode (daemon dies → ticks stop → consumers stall) - Container/Kubernetes complexity (sidecar? separate deployment?) - No option on managed databases where you can't run custom daemons pg_cron (supported on the major providers listed in section 3.1, with provider-specific constraints) eliminates this entirely. The ticker and maintenance run as scheduled SQL inside the database. ### 3.3 The proven path PgQ already ships complete PL/pgSQL replacements for all C code in `lowlevel_pl/`. The switch is toggled by `sql/switch_plonly.sql`: ```sql -- switch_plonly.sql: switches from C to PL/pgSQL implementations \i lowlevel_pl/insert_event.sql \i lowlevel_pl/jsontriga.sql \i lowlevel_pl/logutriga.sql \i lowlevel_pl/sqltriga.sql ``` This means: 1. PgQ's authors already validated PL/pgSQL as a correct replacement for C 2. The PL/pgSQL code has been in the PgQ repo since v3.2 (2012) 3. pgque does not invent new queue logic — it repackages proven code ### 3.4 What changes from PgQ to pgque #### 3.4.1 Rename `pgq` to `pgque` All schema objects move from the `pgq` schema to `pgque`. This is a mechanical search-and-replace across ~40 source files with no behavioral change. #### 3.4.2 Replace `txid_*` with `pg_*` snapshot functions PostgreSQL 13+ introduced `pg_snapshot` functions that replace the older `txid_*` family: | PgQ (deprecated) | pgque (modern) | Notes | |---|---|---| | `txid_current()` | `pg_current_xact_id()` | Returns `xid8` not `bigint` | | `txid_current_snapshot()` | `pg_current_snapshot()` | Returns `pg_snapshot` | | `txid_snapshot_xmax()` | `pg_snapshot_xmax()` | | | `txid_snapshot_xmin()` | `pg_snapshot_xmin()` | | | `txid_snapshot_xip()` | `pg_snapshot_xip()` | | | `txid_visible_in_snapshot()` | `pg_visible_in_snapshot()` | | | `txid_snapshot` type | `pg_snapshot` type | | Concrete changes in the schema: ```sql -- PgQ (tables.sql): queue_switch_step1 bigint not null default txid_current(), queue_switch_step2 bigint default txid_current(), tick_snapshot txid_snapshot not null default txid_current_snapshot(), ev_txid bigint not null default txid_current(), -- pgque: queue_switch_step1 xid8 not null default pg_current_xact_id(), queue_switch_step2 xid8 default pg_current_xact_id(), tick_snapshot pg_snapshot not null default pg_current_snapshot(), ev_txid xid8 not null default pg_current_xact_id(), ``` The `xid8` type avoids the `bigint` -> `xid8` cast overhead on the hot insert path. `pg_visible_in_snapshot()` accepts `xid8` natively. Functions affected (each has `txid_*` calls that become `pg_*`): - `pgque.ticker()` -- `txid_snapshot_xmax()`, `txid_current()` - `pgque.batch_event_sql()` -- `txid_snapshot_xmax()`, `txid_visible_in_snapshot()`, `txid_snapshot_xip()` - `pgque.maint_rotate_tables_step1()` -- `txid_current()`, `txid_snapshot_xmin()` - `pgque.maint_rotate_tables_step2()` -- `txid_current()` - `pgque.insert_event_raw()` -- implicit via `ev_txid` default #### 3.4.3 Replace `pgqd` with `pg_cron` PgQ requires `pgqd`, an external C daemon. pgque replaces it with two `pg_cron` jobs created by `pgque.start()`: ```sql -- Ticker: every 2 seconds (pg_cron >= 1.5 required for sub-minute scheduling) SELECT cron.schedule_in_database( 'pgque_ticker', '2 seconds', $$SET statement_timeout = '1500ms'; SELECT pgque.ticker()$$, current_database() ); -- Maintenance: every 30 seconds SELECT cron.schedule_in_database( 'pgque_maint', '30 seconds', $$SET statement_timeout = '25s'; SELECT pgque.maint()$$, current_database() ); ``` See SPEC.md section 4.3 for the full pg_cron integration design including idempotent start, worker starvation detection, and graceful degradation without pg_cron. #### 3.4.4 Remove `CREATE EXTENSION` dependency PgQ installs via `CREATE EXTENSION pgq`. pgque installs via a single SQL file: ``` \i pgque.sql SELECT pgque.start(); -- optional: creates pg_cron jobs ``` No `.control` file, no PGXS, no `pg_dump` extension handling. The install script is idempotent -- safe to re-run. Uninstall: ``` SELECT pgque.uninstall(); -- stops pg_cron jobs + DROP SCHEMA pgque CASCADE ``` #### 3.4.5 SECURITY DEFINER hardening PgQ's functions use `SECURITY DEFINER` but lack `search_path` pinning. pgque adds `SET search_path = pgque, pg_catalog` to every `SECURITY DEFINER` function. See SPEC.md section 3.2.7 for the full hardening rules. PgQ example (vulnerable): ```sql create function pgq.insert_event(...) ... language plpgsql security definer; ``` pgque (hardened): ```sql create function pgque.insert_event(...) ... language plpgsql security definer set search_path = pgque, pg_catalog; ``` Every function in pgque must follow this pattern. No exceptions. #### 3.4.6 Drop `queue_per_tx_limit` PgQ's `queue_per_tx_limit` uses C-level per-transaction state tracking via `GetTopTransactionId()`. This cannot be replicated cleanly in PL/pgSQL. The feature is rarely used. pgque drops it. The `queue_per_tx_limit` column is removed from `pgque.queue`. #### 3.4.6.1 Add `queue_max_retries` column pgque adds a `queue_max_retries` column to `pgque.queue`: ```sql alter table pgque.queue add column queue_max_retries int4; -- NULL means use default (5). Set via create_queue() JSONB options -- or set_queue_config(). ``` The `create_queue()` JSONB overload maps `"max_retries"` to this column. `nack()` reads `queue_max_retries` to decide retry vs. DLQ routing (see section 4.3). #### 3.4.7 Drop `set default_with_oids = 'off'` PgQ's `structure/tables.sql` sets `default_with_oids = 'off'` (removed in PG12). pgque drops this line. #### 3.4.8 Clean up maintenance operations PgQ's `maint_operations()` includes hardcoded references to `pgq_node` and `londiste` (checking for their procedures by name). pgque removes these -- Londiste and pgq_node are out of scope. The `queue_extra_maint` column is preserved but CHECK-constrained to NULL in v1 (see SPEC.md section 4.4.2). #### 3.4.9 Add `pgque.config` singleton table pgque adds a config table for pg_cron job tracking: ```sql CREATE TABLE pgque.config ( singleton bool PRIMARY KEY DEFAULT true CHECK (singleton), ticker_job_id bigint, maint_job_id bigint, installed_at timestamptz NOT NULL DEFAULT clock_timestamp() ); INSERT INTO pgque.config (singleton) VALUES (true); ``` #### 3.4.10 Add lifecycle functions Functions not present in PgQ: | Function | Purpose | |---|---| | `pgque.start()` | Create pg_cron jobs, store job IDs | | `pgque.stop()` | Remove pg_cron jobs | | `pgque.uninstall()` | Stop + DROP SCHEMA pgque CASCADE | | `pgque.status()` | Diagnostic dashboard (TABLE return type) | #### 3.4.11 LISTEN/NOTIFY in ticker PgQ's ticker does not emit notifications. pgque's ticker adds: ```sql PERFORM pg_notify('pgque_' || queue_name, tick_id::text); ``` after each tick, enabling low-latency consumer wakeup. #### 3.4.12 Summary of changes | Area | PgQ v3.5.1 | pgque | Type of change | |---|---|---|---| | Schema | `pgq` | `pgque` | Rename | | Snapshot functions | `txid_*` | `pg_*` | Rename | | Transaction ID type | `bigint` | `xid8` | Type change | | Snapshot type | `txid_snapshot` | `pg_snapshot` | Type change | | Daemon | `pgqd` (external C) | `pg_cron` jobs | Architecture | | Installation | `CREATE EXTENSION` | `\i pgque.sql` | Packaging | | `search_path` pinning | Missing | On all SECURITY DEFINER functions | Security | | `queue_per_tx_limit` | Supported (C) | Removed | Scope reduction | | `default_with_oids` | Set to 'off' | Removed (PG12+) | Cleanup | | `maint_operations` | pgq_node/Londiste hooks | Removed | Scope reduction | | `config` table | Not present | Added | New | | Lifecycle functions | Not present | `start/stop/uninstall/status` | New | | LISTEN/NOTIFY | Not present | Ticker emits NOTIFY | New | | PG minimum | PG 9.x+ | PG 14+ | Version bump | --- ## 4. The Modern API Layer PgQ's native API is powerful but low-ceremony-hostile. A consumer must: `register_consumer` -> `next_batch` -> `get_batch_events` -> process each event -> `event_retry` for failures -> `finish_batch`. This is fine for infrastructure engineers; it is not what a product engineer expects from a queue. pgque adds a simplified API layer that wraps PgQ's internals. The PgQ API remains available for advanced use cases. The modern API targets the 80% use case: send a JSON message, receive it, ack or nack it. ### 4.1 Publishing: `pgque.send()` ```sql -- Default path: untyped literal resolves to send(text, text) -- verbatim bytes select pgque.send('orders', '{"order_id": 42, "total": 99.95}'); -- Opt-in validation: explicit ::jsonb cast resolves to send(text, jsonb) select pgque.send('orders', '{"order_id": 42, "total": 99.95}'::jsonb); -- Send with explicit type (both overloads available on the same rules) select pgque.send('orders', 'order.created', '{"order_id": 42}'); select pgque.send('orders', 'order.created', '{"order_id": 42}'::jsonb); -- Send batch (text[] default; use ::jsonb[] cast to opt into validation) select pgque.send_batch('orders', 'order.created', array[ '{"order_id": 42}', '{"order_id": 43}', '{"order_id": 44}' ]); -- Textual non-JSON payloads (XML, CSV, base64/hex-encoded binary) go -- through the text overload as-is. Raw binary with NUL bytes is rejected -- by PG text -- encode first (e.g. encode(raw_proto, 'base64')). -- Note PG bytea hex input is a single \x prefix followed by hex digits; -- per-byte separators are not allowed (use \x082a1063, not \x08\x2a\x10\x63). select pgque.send('orders', 'order.xml', ''); select pgque.send('orders', 'order.proto_b64', encode('\x082a1063'::bytea, 'base64')); -- Delayed send (deliver after timestamp) select pgque.send_at('orders', 'reminder.send', '{"user_id": 7}'::jsonb, now() + interval '24 hours'); -- Priority: use separate queues (recommended) or ev_extra1 for -- client-side sorting within a batch (see section 7.6) ``` **Payload type choice.** Storage is always `ev_data TEXT`; the two overloads differ only in the client-side validation/reserialization contract. **Overload resolution.** PostgreSQL picks the overload that needs fewest implicit casts. An untyped SQL string literal has type `unknown`, and `unknown → text` is a direct match while `unknown → jsonb` needs an implicit cast. So: ```sql -- Untyped literal → resolves to send(text, text), bytes verbatim select pgque.send('orders', '{"order_id": 42}'); -- Explicit ::jsonb → resolves to send(text, jsonb), validated + canonicalized select pgque.send('orders', '{"order_id": 42}'::jsonb); ``` The `text` overload is therefore the natural default for plain SQL callers and for drivers that pass parameters as text (most of them). The `jsonb` overload is opt-in via an explicit `::jsonb` cast. - `text` overload (default for untyped literals): fast path. Bytes flow straight through to `insert_event`. No parse, no reserialization, key order preserved. Required for non-JSON *textual* payloads (XML, CSV, base64/hex-encoded binary). Caller is responsible for validating the payload. - `jsonb` overload (opt-in): PG rejects malformed JSON at parse time and the payload is stored in canonical form (keys sorted, whitespace normalized). Useful when you want PG to be the last line of defense against malformed JSON. **NUL bytes.** `ev_data` is `text`, and PostgreSQL `text` does not accept NUL (`\x00`). Raw binary wire formats (protobuf, msgpack, Avro, packed bytea dumps) routinely contain NULs and will be rejected with `invalid byte sequence` at insert time. Callers that want to ship binary payloads must encode them first -- `encode(bytes, 'base64')`, `encode(bytes, 'hex')`, or a custom escape -- and decode on the consumer side. A future `bytea` overload could bypass this at the cost of changing `ev_data`'s storage type; deferred pending demand. Both overloads return `bigint` (event ID). `receive()` returns payload as `text`, so the `text`-both-sides path is symmetric (no implicit canonicalization anywhere). The `jsonb`-in / `text`-out path leaves client-side JSON parsing to the consumer, which is where it has to live anyway (a PG composite type cannot polymorphically carry both `text` and `jsonb`). **Internal mapping:** ```sql -- jsonb overloads (opt-in via ::jsonb cast; validation + canonicalization) create function pgque.send(i_queue text, i_payload jsonb) returns bigint as $$ begin return pgque.insert_event(i_queue, 'default', i_payload::text); end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; create function pgque.send(i_queue text, i_type text, i_payload jsonb) returns bigint as $$ begin return pgque.insert_event(i_queue, i_type, i_payload::text); end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; -- text overloads (default for untyped literals; fast path, opaque payload) create function pgque.send(i_queue text, i_payload text) returns bigint as $$ begin return pgque.insert_event(i_queue, 'default', i_payload); end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; create function pgque.send(i_queue text, i_type text, i_payload text) returns bigint as $$ begin return pgque.insert_event(i_queue, i_type, i_payload); end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; create function pgque.send_batch( i_queue text, i_type text, i_payloads jsonb[]) returns bigint[] as $$ declare ids bigint[] := '{}'; p jsonb; begin -- TODO: optimize to resolve queue/table once and bypass insert_event_raw -- with a single multi-VALUES insert. Currently each insert_event() call -- resolves the queue independently. Deferred to implementation. foreach p in array i_payloads loop ids := array_append(ids, pgque.insert_event(i_queue, i_type, p::text)); end loop; return ids; end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; create function pgque.send_batch( i_queue text, i_type text, i_payloads text[]) returns bigint[] as $$ declare ids bigint[] := '{}'; p text; begin foreach p in array i_payloads loop ids := array_append(ids, pgque.insert_event(i_queue, i_type, p)); end loop; return ids; end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; ``` ### 4.2 Consuming: `pgque.receive()` `receive()` wraps `next_batch` + `get_batch_events` into a single call that returns messages directly. ```sql -- Receive messages (returns up to 100 from the current batch) select * from pgque.receive('orders', 'order_processor', 100); -- Returns: msg_id, batch_id, type, payload, retry_count, created_at ``` **Return type:** ```sql CREATE TYPE pgque.message AS ( msg_id bigint, -- ev_id batch_id bigint, -- batch containing this message type text, -- ev_type payload text, -- ev_data (caller casts to jsonb if needed) retry_count int4, -- ev_retry (NULL for first delivery) created_at timestamptz, -- ev_time extra1 text, -- ev_extra1 extra2 text, -- ev_extra2 extra3 text, -- ev_extra3 extra4 text -- ev_extra4 ); ``` **Implementation:** ```sql CREATE FUNCTION pgque.receive( i_queue text, i_consumer text, i_max_return int DEFAULT 100) RETURNS SETOF pgque.message AS $$ DECLARE v_batch_id bigint; ev record; cnt int := 0; BEGIN -- Get next batch (may return NULL if no events) v_batch_id := pgque.next_batch(i_queue, i_consumer); IF v_batch_id IS NULL THEN RETURN; END IF; -- Yield messages from the batch FOR ev IN SELECT ev_id, ev_type, ev_data, ev_retry, ev_time, ev_extra1, ev_extra2, ev_extra3, ev_extra4 FROM pgque.get_batch_events(v_batch_id) LOOP RETURN NEXT ROW( ev.ev_id, v_batch_id, ev.ev_type, ev.ev_data, ev.ev_retry, ev.ev_time, ev.ev_extra1, ev.ev_extra2, ev.ev_extra3, ev.ev_extra4 )::pgque.message; cnt := cnt + 1; EXIT WHEN cnt >= i_max_return; END LOOP; RETURN; END; $$ LANGUAGE plpgsql SECURITY DEFINER SET search_path = pgque, pg_catalog; ``` **Batch ownership semantics (critical — read carefully):** `receive()` opens a PgQ batch via `next_batch()`. A batch contains ALL events committed between two ticks. The `i_max_return` parameter limits how many messages are *returned to the caller*, but the batch contains all events regardless. **When `ack(batch_id)` is called, the ENTIRE batch is finished** — including events that were not returned due to `i_max_return`. This is not a bug; it matches PgQ's design where `finish_batch()` advances the consumer past the entire tick range. **Users coming from per-message queues (SQS, PGMQ, Redis):** this is the single biggest conceptual difference. pgque is batch-oriented. The recommended consumer pattern is: 1. Call `receive()` — get a batch of messages 2. Process ALL returned messages 3. For individual failures, call `nack(batch_id, msg_id, ...)` to retry that event 4. Call `ack(batch_id)` — finishes the batch and advances the consumer position 5. Nacked events reappear in a future batch via the retry queue **Is `ack()` legal after some messages were nacked?** Yes. `nack()` copies the failed event into the retry queue with a delay. `ack()` then finishes the batch normally. The nacked event will reappear in a future batch when its retry delay expires. **What about mixed outcomes?** Process all events in the batch. Call `nack()` for each failure. Call `ack()` to finish. This is the standard pattern and is how PgQ has always worked. **Rotation blocking:** An open batch (from `receive()` with no subsequent `ack()`) blocks table rotation the same way a slow consumer does in raw PgQ. If the caller takes minutes to process, rotation cannot TRUNCATE the tables the batch reads from. Client libraries should enforce a maximum batch processing timeout and ack/nack on timeout. ### 4.3 Acknowledging: `pgque.ack()` and `pgque.nack()` ```sql -- Ack: finish the batch, advance consumer position select pgque.ack(batch_id); -- Nack a single message: retry after 60 seconds -- Pass the full message record (avoids re-querying the batch) select pgque.nack(batch_id, msg, '60 seconds'::interval); -- Nack with reason (goes to DLQ after max retries, read from queue config) select pgque.nack(batch_id, msg, '60 seconds'::interval, 'upstream timeout'); ``` **Internal mapping:** ```sql create function pgque.ack(i_batch_id bigint) returns integer as $$ begin return pgque.finish_batch(i_batch_id); end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; create function pgque.nack( i_batch_id bigint, i_msg pgque.message, i_retry_after interval default '60 seconds', i_reason text default null) returns integer as $$ declare v_max_retries int4; begin -- Single lookup: subscription -> queue config select coalesce(q.queue_max_retries, 5) into v_max_retries from pgque.subscription s join pgque.queue q on q.queue_id = s.sub_queue where s.sub_batch = i_batch_id; if coalesce(i_msg.retry_count, 0) >= v_max_retries then -- Move to dead letter queue (pass event fields, no re-query) perform pgque.event_dead(i_batch_id, i_msg.msg_id, coalesce(i_reason, 'max retries exceeded'), i_msg.created_at, null::xid8, i_msg.retry_count, i_msg.type, i_msg.payload, i_msg.extra1, i_msg.extra2, i_msg.extra3, i_msg.extra4); else -- Retry after delay perform pgque.event_retry(i_batch_id, i_msg.msg_id, extract(epoch from i_retry_after)::integer); end if; return 1; end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; ``` ### 4.4 Subscriptions: fan-out PgQ already supports multiple consumers per queue. pgque wraps this: ```sql -- Subscribe a consumer (starts receiving from current position) SELECT pgque.subscribe('orders', 'analytics_pipeline'); SELECT pgque.subscribe('orders', 'notification_sender'); SELECT pgque.subscribe('orders', 'audit_logger'); -- Each consumer receives independently SELECT * FROM pgque.receive('orders', 'analytics_pipeline', 100); SELECT * FROM pgque.receive('orders', 'notification_sender', 100); -- Unsubscribe SELECT pgque.unsubscribe('orders', 'analytics_pipeline'); ``` **Internal mapping:** ```sql CREATE FUNCTION pgque.subscribe(i_queue text, i_consumer text) RETURNS integer AS $$ BEGIN RETURN pgque.register_consumer(i_queue, i_consumer); END; $$ LANGUAGE plpgsql SECURITY DEFINER SET search_path = pgque, pg_catalog; ``` ### 4.5 Dead Letter Queue PgQ has a retry queue but no dead letter queue. pgque adds one. **Table:** ```sql CREATE TABLE pgque.dead_letter ( dl_id bigserial PRIMARY KEY, dl_queue_id int4 NOT NULL REFERENCES pgque.queue(queue_id), dl_consumer_id int4 NOT NULL REFERENCES pgque.consumer(co_id), dl_time timestamptz NOT NULL DEFAULT now(), dl_reason text, -- Original event fields (copied from event at time of death) ev_id bigint NOT NULL, ev_time timestamptz NOT NULL, ev_txid xid8, -- NULL: pgque.message does not carry txid -- (internal detail, meaningless after batch closes) ev_retry int4, ev_type text, ev_data text, ev_extra1 text, ev_extra2 text, ev_extra3 text, ev_extra4 text ); CREATE INDEX dl_queue_time_idx ON pgque.dead_letter (dl_queue_id, dl_time); ``` **Functions:** ```sql -- Move event to DLQ (called by nack() when max retries exceeded) pgque.event_dead(batch_id bigint, event_id bigint, reason text) RETURNS integer -- Inspect DLQ pgque.dlq_inspect(queue_name text, limit_count int DEFAULT 100) RETURNS SETOF pgque.dead_letter -- Replay a dead letter event back into the queue pgque.dlq_replay(dead_letter_id bigint) RETURNS bigint -- new event ID -- Replay all DLQ events for a queue pgque.dlq_replay_all(queue_name text) RETURNS integer -- count of replayed events -- Purge old DLQ entries pgque.dlq_purge(queue_name text, older_than interval DEFAULT '30 days') RETURNS integer -- count of purged entries ``` **`event_dead()` implementation:** `nack()` already has the full message from `receive()`. Rather than re-querying the batch via `get_batch_events()` (which runs the full snapshot-based dual-filter query), `nack()` performs the DLQ insert directly. See the `nack()` implementation below — it calls `event_dead()` with the event fields passed through from the caller. ```sql create function pgque.event_dead( i_batch_id bigint, i_event_id bigint, i_reason text, i_ev_time timestamptz, i_ev_txid xid8, i_ev_retry int4, i_ev_type text, i_ev_data text, i_ev_extra1 text default null, i_ev_extra2 text default null, i_ev_extra3 text default null, i_ev_extra4 text default null) returns integer as $$ declare v_sub record; begin -- Look up subscription from batch select sub_queue, sub_consumer into v_sub from pgque.subscription where sub_batch = i_batch_id; if not found then raise exception 'batch not found: %', i_batch_id; end if; -- Insert into dead letter table (no re-query of batch events) insert into pgque.dead_letter ( dl_queue_id, dl_consumer_id, dl_reason, ev_id, ev_time, ev_txid, ev_retry, ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4) values ( v_sub.sub_queue, v_sub.sub_consumer, i_reason, i_event_id, i_ev_time, i_ev_txid, i_ev_retry, i_ev_type, i_ev_data, i_ev_extra1, i_ev_extra2, i_ev_extra3, i_ev_extra4); return 1; end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; ``` ### 4.6 Delayed / Scheduled Delivery Events with a future delivery time go into a holding table. A maintenance step moves them to the main event table when their time arrives. **Table:** ```sql CREATE TABLE pgque.delayed_events ( de_id bigserial PRIMARY KEY, de_queue_name text NOT NULL, de_deliver_at timestamptz NOT NULL, de_type text, de_data text, de_extra1 text, de_extra2 text, de_extra3 text, de_extra4 text ); CREATE INDEX de_deliver_idx ON pgque.delayed_events (de_deliver_at); ``` **`send_at()` implementation:** **Return value semantics:** When delivery is immediate (`i_deliver_at <= now()`), returns the queue event ID (from `insert_event()`). When delivery is delayed, returns the **scheduled-entry ID** from `delayed_events.de_id` — this is NOT a queue event ID. The actual queue event ID is assigned later when `maint_deliver_delayed()` moves the event into the queue. Client libraries should document this distinction clearly. ```sql create function pgque.send_at( i_queue text, i_type text, i_payload jsonb, i_deliver_at timestamptz) returns bigint as $$ begin if i_deliver_at <= now() then -- Deliver immediately; returns queue event ID return pgque.insert_event(i_queue, i_type, i_payload::text); end if; insert into pgque.delayed_events (de_queue_name, de_deliver_at, de_type, de_data) values (i_queue, i_deliver_at, i_type, i_payload::text); -- Returns scheduled-entry ID (NOT a queue event ID) return currval('pgque.delayed_events_de_id_seq'); end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; ``` **Maintenance integration:** `pgque.maint()` calls `pgque.maint_deliver_delayed()` which moves due events: ```sql CREATE FUNCTION pgque.maint_deliver_delayed() RETURNS integer AS $$ DECLARE ev record; cnt integer := 0; BEGIN FOR ev IN DELETE FROM pgque.delayed_events WHERE de_deliver_at <= now() RETURNING * LOOP PERFORM pgque.insert_event(ev.de_queue_name, ev.de_type, ev.de_data, ev.de_extra1, ev.de_extra2, ev.de_extra3, ev.de_extra4); cnt := cnt + 1; END LOOP; RETURN cnt; END; $$ LANGUAGE plpgsql SECURITY DEFINER SET search_path = pgque, pg_catalog; ``` ### 4.7 Queue Management ```sql -- Create a queue with options SELECT pgque.create_queue('orders', '{ "rotation_period": "4 hours", "ticker_max_count": 1000, "ticker_max_lag": "5 seconds", "max_retries": 10 }'::jsonb); -- Pause/resume SELECT pgque.pause_queue('orders'); SELECT pgque.resume_queue('orders'); -- Simplified wrappers around set_queue_config CREATE FUNCTION pgque.pause_queue(i_queue text) RETURNS void AS $$ BEGIN PERFORM pgque.set_queue_config(i_queue, 'ticker_paused', 'true'); END; $$ LANGUAGE plpgsql SECURITY DEFINER SET search_path = pgque, pg_catalog; ``` ### 4.8 API Summary: Modern vs. PgQ Primitives | Modern API | PgQ primitive underneath | Notes | |---|---|---| | `pgque.send(queue, payload)` | `pgque.insert_event(queue, type, data)` | TEXT overload is default for untyped literals (fast path, opaque bytes); JSONB overload is opt-in via `::jsonb` cast (validation + canonicalization) | | `pgque.send_batch(queue, type, payloads[])` | Loop of `insert_event()` calls | Single TX; `text[]` default, `jsonb[]` opt-in via `::jsonb[]` cast | | `pgque.send_at(queue, type, payload, time)` | `delayed_events` table + `maint_deliver_delayed()` | New | | `pgque.receive(queue, consumer, n)` | `next_batch()` + `get_batch_events()` | Combined | | `pgque.ack(batch_id)` | `finish_batch(batch_id)` | Rename | | `pgque.nack(batch_id, msg, delay)` | `event_retry(batch_id, msg_id, seconds)` | + DLQ logic, reads max_retries from queue config | | `pgque.subscribe(queue, consumer)` | `register_consumer(queue, consumer)` | Rename | | `pgque.unsubscribe(queue, consumer)` | `unregister_consumer(queue, consumer)` | Rename | | `pgque.event_dead(batch, event_id, reason, ...)` | `dead_letter` table insert | New, accepts event fields from caller | | `pgque.dlq_replay(dl_id)` | `insert_event()` from dead_letter row | New | | `pgque.pause_queue(queue)` | `set_queue_config(queue, 'ticker_paused', 'true')` | Convenience | The PgQ-style API (`insert_event`, `next_batch`, `get_batch_events`, `finish_batch`, `event_retry`) remains fully available for users who need fine-grained control. --- ## 5. Observability ### 5.1 SQL Metrics Views **`pgque.queue_stats()`** -- real-time queue health: ```sql CREATE FUNCTION pgque.queue_stats() RETURNS TABLE ( queue_name text, queue_id int4, depth bigint, -- events pending across all consumers oldest_msg_age interval, -- age of oldest unconsumed event consumers int4, -- number of registered consumers events_per_sec numeric, -- throughput estimate (ticks-based) cur_table int4, -- current rotation table index rotation_age interval, -- time since last rotation rotation_period interval, -- configured rotation period ticker_paused boolean, last_tick_time timestamptz, last_tick_id bigint, dlq_count bigint -- dead letter queue entries ) AS $$ BEGIN RETURN QUERY SELECT q.queue_name, q.queue_id, coalesce( (SELECT max(t_cur.tick_event_seq) - min(t_sub.tick_event_seq) FROM pgque.subscription s JOIN pgque.tick t_sub ON t_sub.tick_queue = q.queue_id AND t_sub.tick_id = s.sub_last_tick CROSS JOIN LATERAL ( SELECT tick_event_seq FROM pgque.tick WHERE tick_queue = q.queue_id ORDER BY tick_id DESC LIMIT 1 ) t_cur WHERE s.sub_queue = q.queue_id ), 0)::bigint AS depth, (SELECT now() - min(t.tick_time) FROM pgque.subscription s JOIN pgque.tick t ON t.tick_queue = q.queue_id AND t.tick_id = s.sub_last_tick WHERE s.sub_queue = q.queue_id ) AS oldest_msg_age, (SELECT count(*)::int4 FROM pgque.subscription WHERE sub_queue = q.queue_id) AS consumers, (SELECT CASE WHEN t2.tick_time = t1.tick_time THEN 0 ELSE (t2.tick_event_seq - t1.tick_event_seq)::numeric / extract(epoch from t2.tick_time - t1.tick_time) END FROM pgque.tick t1, pgque.tick t2 WHERE t1.tick_queue = q.queue_id AND t2.tick_queue = q.queue_id AND t2.tick_id = (SELECT max(tick_id) FROM pgque.tick WHERE tick_queue = q.queue_id) AND t1.tick_id = t2.tick_id - 1 ) AS events_per_sec, q.queue_cur_table, now() - q.queue_switch_time AS rotation_age, q.queue_rotation_period, q.queue_ticker_paused, (SELECT max(tick_time) FROM pgque.tick WHERE tick_queue = q.queue_id) AS last_tick_time, (SELECT max(tick_id) FROM pgque.tick WHERE tick_queue = q.queue_id) AS last_tick_id, (SELECT count(*) FROM pgque.dead_letter WHERE dl_queue_id = q.queue_id)::bigint AS dlq_count FROM pgque.queue q ORDER BY q.queue_name; END; $$ LANGUAGE plpgsql SECURITY DEFINER SET search_path = pgque, pg_catalog; ``` **`pgque.consumer_stats()`** -- per-consumer metrics: ```sql CREATE FUNCTION pgque.consumer_stats() RETURNS TABLE ( queue_name text, consumer_name text, lag interval, -- time behind latest tick pending_events bigint, -- estimated events to process last_seen timestamptz, -- sub_active timestamp batch_active boolean, -- has an open batch batch_id bigint -- current batch ID (NULL if none) ) AS $$ BEGIN RETURN QUERY SELECT q.queue_name, c.co_name, now() - t.tick_time AS lag, coalesce( (SELECT max(t2.tick_event_seq) FROM pgque.tick t2 WHERE t2.tick_queue = q.queue_id) - t.tick_event_seq, 0)::bigint AS pending_events, s.sub_active, s.sub_batch IS NOT NULL, s.sub_batch FROM pgque.subscription s JOIN pgque.queue q ON q.queue_id = s.sub_queue JOIN pgque.consumer c ON c.co_id = s.sub_consumer LEFT JOIN pgque.tick t ON t.tick_queue = s.sub_queue AND t.tick_id = s.sub_last_tick ORDER BY q.queue_name, c.co_name; END; $$ LANGUAGE plpgsql SECURITY DEFINER SET search_path = pgque, pg_catalog; ``` **`pgque.queue_health()`** -- operational diagnostics: ```sql CREATE FUNCTION pgque.queue_health() RETURNS TABLE ( queue_name text, check_name text, status text, -- 'ok', 'warning', 'critical' detail text ) AS $$ BEGIN -- Check: ticker is running (handle queues with no ticks yet) RETURN QUERY SELECT q.queue_name, 'ticker_running'::text, CASE WHEN max(t.tick_time) IS NULL THEN 'critical' WHEN now() - max(t.tick_time) > interval '10 seconds' THEN 'critical' ELSE 'ok' END, 'Last tick: ' || coalesce(max(t.tick_time)::text, 'never') FROM pgque.queue q LEFT JOIN pgque.tick t ON t.tick_queue = q.queue_id WHERE NOT q.queue_ticker_paused GROUP BY q.queue_name; -- Check: consumer lag RETURN QUERY SELECT q.queue_name, ('consumer_lag:' || c.co_name)::text, CASE WHEN now() - t.tick_time > q.queue_rotation_period THEN 'critical' WHEN now() - t.tick_time > q.queue_rotation_period / 2 THEN 'warning' ELSE 'ok' END, c.co_name || ' lag: ' || (now() - t.tick_time)::text FROM pgque.subscription s JOIN pgque.queue q ON q.queue_id = s.sub_queue JOIN pgque.consumer c ON c.co_id = s.sub_consumer JOIN pgque.tick t ON t.tick_queue = s.sub_queue AND t.tick_id = s.sub_last_tick; -- Check: rotation overdue RETURN QUERY SELECT q.queue_name, 'rotation_health'::text, CASE WHEN q.queue_switch_step2 IS NULL THEN 'warning' WHEN now() - q.queue_switch_time > q.queue_rotation_period * 2 THEN 'warning' ELSE 'ok' END, CASE WHEN q.queue_switch_step2 IS NULL THEN 'mid-rotation (step2 pending)' ELSE 'last rotation: ' || q.queue_switch_time::text END FROM pgque.queue q; -- Check: DLQ growing RETURN QUERY SELECT q.queue_name, 'dlq_health'::text, CASE WHEN count(dl.*) > 1000 THEN 'warning' WHEN count(dl.*) > 0 THEN 'ok' ELSE 'ok' END, count(dl.*)::text || ' dead letter events' FROM pgque.queue q LEFT JOIN pgque.dead_letter dl ON dl.dl_queue_id = q.queue_id GROUP BY q.queue_name; -- Check: pg_cron jobs RETURN QUERY SELECT 'system'::text, 'pg_cron_ticker'::text, CASE WHEN cfg.ticker_job_id IS NULL THEN 'critical' ELSE 'ok' END, CASE WHEN cfg.ticker_job_id IS NULL THEN 'ticker job not scheduled' ELSE 'job_id=' || cfg.ticker_job_id::text END FROM pgque.config cfg; RETURN QUERY SELECT 'system'::text, 'pg_cron_maint'::text, CASE WHEN cfg.maint_job_id IS NULL THEN 'critical' ELSE 'ok' END, CASE WHEN cfg.maint_job_id IS NULL THEN 'maint job not scheduled' ELSE 'job_id=' || cfg.maint_job_id::text END FROM pgque.config cfg; END; $$ LANGUAGE plpgsql SECURITY DEFINER SET search_path = pgque, pg_catalog; ``` **Historical metrics:** ```sql -- Throughput over time (bucketed) pgque.throughput(queue_name text, period interval, bucket_size interval) RETURNS TABLE (bucket_start timestamptz, events bigint, events_per_sec numeric) -- Uses tick history to compute per-bucket throughput -- Latency percentiles (estimated from tick-to-consume gap) pgque.latency_percentiles(queue_name text, consumer_name text, period interval) RETURNS TABLE (p50 interval, p95 interval, p99 interval) -- Error rate (retries + DLQ per time period) pgque.error_rate(queue_name text, period interval, bucket_size interval) RETURNS TABLE (bucket_start timestamptz, retries bigint, dead_letters bigint) ``` **Operational views:** ```sql -- Messages currently being processed pgque.in_flight(queue_name text) RETURNS TABLE (consumer_name text, batch_id bigint, batch_age interval, estimated_events bigint) -- Consumers that haven't processed in a long time pgque.stuck_consumers(threshold interval DEFAULT '1 hour') RETURNS TABLE (queue_name text, consumer_name text, lag interval, last_active timestamptz) ``` ### 5.2 OpenTelemetry Integration pgque provides OTel-compatible metrics via SQL functions that can be polled by an OTel Collector sidecar or a pg_cron job that pushes to a collector. **OTel Metric Mapping:** | OTel Metric Name | Type | SQL Source | |---|---|---| | `pgque.queue.depth` | Gauge | `queue_stats().depth` | | `pgque.queue.oldest_message_age_seconds` | Gauge | `queue_stats().oldest_msg_age` | | `pgque.message.sent_total` | Counter | Tick event_seq delta | | `pgque.message.acked_total` | Counter | Derived from `finish_batch` calls | | `pgque.message.nacked_total` | Counter | `retry_queue` insertions | | `pgque.message.dead_lettered` | Gauge | `dead_letter` current count (resets on `dlq_purge`) | | `pgque.consumer.lag_seconds` | Gauge | `consumer_stats().lag` | | `pgque.consumer.pending_events` | Gauge | `consumer_stats().pending_events` | | `pgque.processing_latency_seconds` | Histogram | Consumer-side (client library) | | `pgque.batch.size` | Histogram | Consumer-side (client library) | | `pgque.ticker.duration_seconds` | Gauge | pg_cron `run_details` | **OTel metrics export function** (called by pg_cron or external poller): ```sql CREATE FUNCTION pgque.otel_metrics() RETURNS TABLE ( metric_name text, metric_type text, -- 'gauge', 'counter' value numeric, labels jsonb -- {"queue": "orders", "consumer": "processor"} ) AS $$ BEGIN -- Queue depth gauges RETURN QUERY SELECT 'pgque.queue.depth'::text, 'gauge'::text, qs.depth::numeric, jsonb_build_object('queue', qs.queue_name) FROM pgque.queue_stats() qs; -- Consumer lag gauges RETURN QUERY SELECT 'pgque.consumer.lag_seconds'::text, 'gauge'::text, extract(epoch from cs.lag)::numeric, jsonb_build_object('queue', cs.queue_name, 'consumer', cs.consumer_name) FROM pgque.consumer_stats() cs; -- DLQ counters RETURN QUERY SELECT 'pgque.message.dead_lettered'::text, 'gauge'::text, qs.dlq_count::numeric, jsonb_build_object('queue', qs.queue_name) FROM pgque.queue_stats() qs; -- Events per sec gauges RETURN QUERY SELECT 'pgque.queue.throughput'::text, 'gauge'::text, coalesce(qs.events_per_sec, 0), jsonb_build_object('queue', qs.queue_name) FROM pgque.queue_stats() qs; END; $$ LANGUAGE plpgsql SECURITY DEFINER SET search_path = pgque, pg_catalog; ``` **OTel Traces:** Trace propagation uses `ev_extra1` (or a dedicated field) to carry trace context. The pattern: 1. **Producer:** Client library serializes W3C traceparent into `ev_extra1` before calling `pgque.send()` 2. **Queue wait:** Duration between `ev_time` and consumer `receive()` is the queue span 3. **Consumer:** Client library extracts traceparent from `ev_extra1`, creates a child span for processing This requires no SQL-side changes -- trace propagation is entirely in the client libraries (section 6). **OTel Logs:** Structured log events for key operations: ```sql CREATE FUNCTION pgque.log_event( i_level text, i_component text, i_message text, i_attrs jsonb DEFAULT '{}') RETURNS void AS $$ BEGIN RAISE LOG 'pgque.%.% %', i_component, i_level, i_message USING DETAIL = i_attrs::text; END; $$ LANGUAGE plpgsql; ``` Operations that emit log events: queue creation/deletion, consumer registration/unregistration, DLQ insertions, rotation steps, ticker anomalies (negative event counts, stale snapshots). ### 5.3 Export Architecture ``` ┌──────────────────────────────────────────────────────┐ │ PostgreSQL │ │ │ │ pgque.otel_metrics() ──> JSON rows │ │ pgque.queue_health() ──> health check rows │ │ │ │ pg_cron (every 15s): │ │ SELECT * FROM pgque.otel_metrics() │ │ -> write to pgque.metrics_buffer (optional) │ │ │ └────────────────────┬─────────────────────────────────┘ │ │ SQL poll (OTLP push from sidecar) │ or pg_cron -> http_post (pg_net) │ ┌──────▼──────┐ │ OTel │ │ Collector │ └──────┬───────┘ │ ┌───────────┼───────────┐ ▼ ▼ ▼ Prometheus Grafana Datadog / Thanos Tempo / etc. ``` Two export paths: 1. **Sidecar poller:** An OTel Collector with a SQL receiver polls `pgque.otel_metrics()` every 15 seconds and converts rows to OTLP. 2. **pg_cron + pg_net:** For environments where a sidecar is not feasible, pg_cron calls a function that formats OTLP JSON and pushes via `pg_net` (HTTP from inside PostgreSQL). This is the fully self-contained option. --- ## 6. Client Libraries Each library wraps pgque's SQL API into idiomatic patterns. The SQL layer handles all queue semantics; the client library handles connection lifecycle, error recovery, and developer experience. ### 6.1 Common Architecture All client libraries share the same structure: ``` ┌─────────────────────────────────────────┐ │ Application Code │ │ producer.send("orders", payload) │ │ consumer.on("order.created", handler) │ └──────────────┬──────────────────────────┘ │ ┌──────────────▼──────────────────────────┐ │ pgque Client Library │ │ - Connection pool awareness │ │ - Graceful shutdown (drain + ack) │ │ - Auto-reconnect with backoff │ │ - OTel trace propagation │ │ - Typed event dispatch │ │ - Batch transaction management │ │ - LISTEN/NOTIFY wakeup + poll fallback │ └──────────────┬──────────────────────────┘ │ SQL ┌──────────────▼──────────────────────────┐ │ PostgreSQL │ │ pgque.send() / pgque.receive() │ │ pgque.ack() / pgque.nack() │ └──────────────────────────────────────────┘ ``` **Common features across all libraries:** | Feature | Description | |---|---| | Connection pooling | Works with connection poolers (PgBouncer, Supavisor). Advisory locks and LISTEN require session-mode pooling; document this clearly. | | Graceful shutdown | On SIGTERM/SIGINT: stop accepting new batches, finish current batch, ack, exit. No orphaned batches. | | Auto-reconnect | Exponential backoff (100ms, 200ms, 400ms, ..., 30s cap). On reconnect, any in-progress batch is automatically redelivered by pgque. | | OTel propagation | Inject W3C traceparent into `ev_extra1` on send. Extract and create child span on receive. | | Batch transactions | The library wraps `receive -> process -> ack` in a database transaction. Crash = rollback = batch redelivered. | | Health check | `library.health()` calls `pgque.queue_health()` and returns structured result. | ### 6.2 Python -- `pgque-py` Built on `psycopg` (v3). Feels like a Python library, not a SQL wrapper. **Producer:** ```python import pgque conn = pgque.connect("postgresql://localhost/mydb") # Simple send conn.send("orders", {"order_id": 42, "total": 99.95}) # Typed send conn.send("orders", type="order.created", payload={"order_id": 42}) # Batch send (single transaction) with conn.batch("orders") as batch: for order in orders: batch.send("order.created", order.to_dict()) # Delayed send conn.send_at("orders", "reminder.send", {"user_id": 7}, delay=timedelta(hours=24)) ``` **Consumer:** ```python import pgque consumer = pgque.Consumer( "postgresql://localhost/mydb", queue="orders", name="order_processor", poll_interval=30, # seconds (fallback if NOTIFY missed) max_retries=5, ) @consumer.on("order.created") def handle_order(msg: pgque.Message): process_order(msg.payload) # Auto-acked if no exception raised @consumer.on("order.created") def handle_order_explicit(msg: pgque.Message): try: process_order(msg.payload) except TransientError: msg.nack(retry_after=60) # retry in 60s except PermanentError: msg.dead_letter("invalid payload") consumer.start() # Blocks, processes until SIGTERM ``` **Internals:** - `consumer.start()` runs: `LISTEN pgque_orders` + poll loop - Each iteration: `SELECT * FROM pgque.receive('orders', 'order_processor', 100)` - For each message: dispatch to registered handler based on `type` - After batch: `SELECT pgque.ack(batch_id)` (in same TX as processing) - On handler exception: `SELECT pgque.nack(batch_id, msg_id, 60)` ### 6.3 Go -- `pgque-go` Built on `pgx/v5`. Follows Go conventions (context, interfaces, struct tags). ```go package main import ( "context" "github.com/pgque/pgque-go" ) func main() { client, _ := pgque.Connect(ctx, "postgresql://localhost/mydb") // Producer client.Send(ctx, "orders", pgque.Event{ Type: "order.created", Payload: Order{ID: 42, Total: 99.95}, }) // Consumer consumer := client.NewConsumer("orders", "order_processor", pgque.WithPollInterval(30 * time.Second), pgque.WithMaxRetries(5), ) consumer.Handle("order.created", func(ctx context.Context, msg pgque.Message) error { var order Order msg.Decode(&order) return processOrder(ctx, order) // return nil = ack, return error = nack with retry }) consumer.Start(ctx) // blocks until context cancelled } ``` ### 6.4 Node.js -- `pgque-js` Built on `pg` (node-postgres). TypeScript-first. ```typescript import { PgqueClient } from 'pgque-js'; const client = new PgqueClient('postgresql://localhost/mydb'); // Producer await client.send('orders', { type: 'order.created', payload: { orderId: 42, total: 99.95 }, }); // Consumer const consumer = client.consumer('orders', 'order_processor', { pollInterval: 30_000, maxRetries: 5, }); consumer.on('order.created', async (msg) => { await processOrder(msg.payload); // auto-acked on success, nacked on thrown error }); await consumer.start(); ``` ### 6.5 Ruby -- `pgque-rb` Built on `pg` gem. Rails-friendly. ```ruby require 'pgque' client = Pgque::Client.new("postgresql://localhost/mydb") # Producer client.send("orders", type: "order.created", payload: { order_id: 42, total: 99.95 }) # Consumer (standalone) consumer = Pgque::Consumer.new( client, queue: "orders", name: "order_processor", poll_interval: 30, max_retries: 5 ) consumer.on("order.created") do |msg| OrderService.process(msg.payload) end consumer.start # blocks # Rails integration (ActiveJob adapter) class OrderJob < ApplicationJob queue_as :orders def perform(order_id) Order.find(order_id).process! end end ``` --- ## 7. Advanced Patterns ### 7.1 Transactional Receive (Exactly-Once Processing) The default `receive` -> `ack` pattern provides at-least-once delivery. For exactly-once processing within a database transaction: ```sql BEGIN; -- Receive messages (inside the transaction) SELECT * FROM pgque.receive('orders', 'processor', 100) INTO TEMP msgs; -- Process: write results to application tables (same TX) INSERT INTO processed_orders (order_id, status) SELECT (payload::jsonb->>'order_id')::int, 'done' FROM msgs; -- Ack the batch (same TX) SELECT pgque.ack( (SELECT DISTINCT batch_id FROM msgs LIMIT 1) ); COMMIT; -- If anything fails, entire TX rolls back: -- application writes AND ack are both undone. -- Next receive() returns the same batch. ``` Client library support: ```python @consumer.on("order.created", transactional=True) def handle_order(msg, conn): # conn is the same connection/TX as the receive + ack conn.execute( "INSERT INTO processed_orders (order_id) VALUES (%s)", [msg.payload["order_id"]] ) # auto-committed with ack if no exception ``` ### 7.2 FIFO with Ordering Keys PgQ processes events in `ev_id` order within a batch (the `ORDER BY 1` in `batch_event_sql`). For strict per-entity ordering: ```python # Producer: use ev_extra1 as ordering key conn.send("orders", type="order.updated", payload=order_data, extra1=f"customer:{customer_id}") # Consumer: group by ordering key, process serially within group @consumer.on("order.updated", ordered_by="extra1") def handle_order(msg): # Framework ensures messages with same extra1 value # are processed sequentially, even across batches update_customer_state(msg.payload) ``` The ordering guarantee within a single batch is inherent (events are ordered by `ev_id`). Cross-batch ordering for a single key requires the client library to track key -> last_processed_batch_id and hold back messages if a previous batch for the same key is still in flight. ### 7.3 Rate Limiting Client-side rate limiting using `next_batch_custom`: ```python consumer = pgque.Consumer( dsn, queue="notifications", name="email_sender", # Process at most 1 batch per 5 seconds min_interval=timedelta(seconds=5), # Wait until batch has at least 50 events min_count=50, # Only process events older than 10 seconds min_lag=timedelta(seconds=10), ) ``` These map directly to `pgque.next_batch_custom(queue, consumer, min_lag, min_count, min_interval)`. For hard rate limiting (e.g., max 100 emails/minute), implement a token bucket in the client: ```python @consumer.on("notification.send", rate_limit=pgque.RateLimit(max_per_minute=100)) def send_notification(msg): send_email(msg.payload) # Framework automatically throttles by sleeping between events # Unprocessed events stay in the batch for next iteration ``` ### 7.4 Cron / Scheduled Jobs pgque uses `send_at()` for future delivery, and recommends `pg_cron` for recurring schedules: ```sql -- One-time scheduled delivery SELECT pgque.send_at('reminders', 'reminder.send', '{"user_id": 7}'::jsonb, now() + interval '24 hours'); -- Recurring jobs: use pg_cron to insert events SELECT cron.schedule('daily_report', '0 9 * * *', $$SELECT pgque.send('jobs', 'report.generate', '{"type": "daily"}'::jsonb)$$); ``` ### 7.5 Message Expiry / TTL Queue-level TTL causes messages older than the threshold to be silently skipped during batch retrieval: ```sql -- Configure TTL on a queue SELECT pgque.set_queue_config('notifications', 'event_ttl', '7 days'); ``` Implementation: `get_batch_events()` adds a filter: ```sql AND ev_time > now() - queue_event_ttl ``` Messages that expire are not retried and not dead-lettered -- they simply vanish. This is appropriate for notification-style queues where stale messages are worthless. ### 7.6 Priority Queues pgque does not natively support per-message priority within a batch (all events in a batch are processed in `ev_id` order). For priority processing, use separate queues: ```sql SELECT pgque.create_queue('orders_high'); SELECT pgque.create_queue('orders_normal'); SELECT pgque.create_queue('orders_low'); ``` The client library's multi-queue consumer processes higher-priority queues first: ```python consumer = pgque.MultiQueueConsumer(dsn, [ ("orders_high", "processor", priority=0), ("orders_normal", "processor", priority=1), ("orders_low", "processor", priority=2), ]) # Drains orders_high before touching orders_normal ``` Alternatively, store a priority value in `ev_extra1` via `insert_event()`. The client library sorts messages within a batch by this field before dispatching. This is client-side sorting, not queue-level priority — all events within a batch are equal at the PgQ level. --- ## 8. Implementation Plan Most code already exists in PgQ's PL/pgSQL source. The plan is organized by what is packaging work vs. new development, broken into sprints with concrete deliverables and test plans. ### 8.0 PgQ Code Import Strategy **pgque must not modify PgQ's core source code in-place.** The PgQ engine (snapshot isolation, batch processing, table rotation, consumer tracking) is proven code with 15+ years of production validation. We import it as a dependency and apply transformations mechanically during the build step. **Approach: git submodule.** ``` pgque/ pgq/ -- git submodule pointing to github.com/pgq/pgq sql/ pgque.sql -- built from pgq/ sources + pgque additions build/ transform.sh -- mechanical rename + modernization script ``` The `pgq/` submodule pins to a specific PgQ release tag (v3.5.1). The build script (`transform.sh`) reads PgQ's PL-only source files, applies the mechanical transformations (schema rename, `txid_*` -> `pg_*`, `xid8`, `search_path` pinning, cleanup), and concatenates the result with pgque's new code (modern API, DLQ, delayed events, observability) into `pgque.sql`. **Why git submodule:** - PgQ upstream changes are visible via submodule diff - Clear separation: pgq code is never edited, only transformed - Updating to a new PgQ release is a submodule pointer update + re-test - Build is reproducible: submodule pin + transform script = deterministic output - License compliance: PgQ's ISC-licensed source is preserved unmodified **Why not fork/copy:** Copying PgQ files into pgque and editing them in-place creates a maintenance burden — any upstream fix requires manual cherry-picking across renamed files. The submodule + transform approach keeps the upstream relationship clean. ### Sprint 1: Repackaging (2 weeks) **Nature:** Mechanical transformation. No new logic. **Deliverables:** 1. Set up PgQ as a git submodule (`pgq/`, pinned to v3.5.1) 2. Create `build/transform.sh` that reads PL-only source files and applies all mechanical transformations (items 3-9 below) 3. Global rename: `pgq.` -> `pgque.` (schema prefix in ~40 files) 4. Replace `txid_*` with `pg_*` functions (8 distinct replacements) 5. Replace `bigint` with `xid8` for txid columns (schema + functions) 6. Replace `txid_snapshot` type with `pg_snapshot` 7. Add `SET search_path = pgque, pg_catalog` to all `SECURITY DEFINER` functions 8. Remove `queue_per_tx_limit` column and references 9. Remove `set default_with_oids = 'off'` 10. Remove `maint_operations` pgq_node/Londiste hooks 11. Add `pgque.config` table 12. Build concatenated `pgque.sql` and `pgque-unpgque.sql` 13. Create roles: `pgque_reader`, `pgque_writer`, `pgque_admin` 14. Regression tests: run PgQ's existing test suite against pgque **Tests for Sprint 1:** - Every PgQ test case (from `sql/` + `expected/`) passes against pgque after rename - `\i pgque.sql` is idempotent (run twice, no errors) - `pgque-unpgque.sql` cleanly removes all objects - `pgque_reader` can call `get_queue_info()` but not `insert_event()` - `pgque_writer` can call `insert_event()` but not `drop_queue()` - `pgque_admin` can call all functions including `drop_queue()` - All `SECURITY DEFINER` functions have `search_path` pinned (automated grep check) **Verification:** The test from `sql/switch_plonly.sql` proves the PL/pgSQL code is correct. After renaming, every PgQ test case should pass identically. **Line count estimate:** ~4,028 lines to transform. Zero new logic. ### Sprint 2: pg_cron Lifecycle (1 week) **Nature:** New code, but simple (pg_cron API calls). **Deliverables:** 1. `pgque.start()` -- creates pg_cron jobs, stores IDs in `pgque.config` 2. `pgque.stop()` -- removes pg_cron jobs 3. `pgque.uninstall()` -- stop + DROP SCHEMA 4. `pgque.status()` -- diagnostic dashboard 5. Graceful degradation when pg_cron not installed 6. LISTEN/NOTIFY in ticker: add `pg_notify()` call **Tests for Sprint 2:** - `pgque.start()` creates exactly 2 pg_cron jobs with correct schedules - `pgque.start()` is idempotent (calling twice does not create duplicate jobs) - `pgque.stop()` removes both jobs; `pgque.status()` reports no active jobs - `pgque.status()` returns correct TABLE rows (ticker status, maint status, pg version) - Without pg_cron installed: `pgque.start()` raises informative error; manual `pgque.ticker()` and `pgque.maint()` still work - `LISTEN pgque_` receives notifications after `pgque.ticker()` runs - `pg_notify` channel name matches `'pgque_' || queue_name` exactly **Line count estimate:** ~200-300 new lines. ### Sprint 3: Modern API Layer (2 weeks) **Nature:** New code wrapping existing primitives. **Deliverables:** 1. `pgque.message` type definition 2. `pgque.send()` / `send_batch()` / `send_at()` 3. `pgque.receive()` 4. `pgque.ack()` / `pgque.nack()` 5. `pgque.subscribe()` / `unsubscribe()` 6. `pgque.dead_letter` table + `event_dead()` + `dlq_inspect/replay/purge` 7. `pgque.delayed_events` table + `maint_deliver_delayed()` 8. `pgque.pause_queue()` / `resume_queue()` 9. `pgque.create_queue()` overload with JSONB options **Tests for Sprint 3:** - `pgque.send()` returns a valid event ID; event appears in `get_batch_events()` after tick - `pgque.send_batch()` inserts all payloads atomically (rollback leaves zero events) - `pgque.receive()` returns messages with correct fields; returns empty set when no events - `pgque.ack()` advances consumer position; subsequent `receive()` gets next batch - `pgque.nack()` with retry count < max schedules retry; event reappears after delay - `pgque.nack()` with retry count >= max moves event to `dead_letter` table - `pgque.dlq_replay()` re-inserts event into queue; `dlq_purge()` removes old entries - `pgque.send_at()` with future timestamp inserts into `delayed_events`; `maint_deliver_delayed()` moves it to queue when due - `pgque.send_at()` with past timestamp inserts directly into queue - `pgque.pause_queue()` stops ticker from generating ticks for that queue; `resume_queue()` restarts - `pgque.subscribe()` / `unsubscribe()` correctly manage consumer registration - `peek()` is deferred to v2 (semantics need rigorous definition — "peek without claiming" vs "read existing batch" are different operations) **Line count estimate:** ~500-700 new lines. ### Sprint 4: Observability (1 week) **Nature:** New code (views and functions over existing tables). **Deliverables:** 1. `pgque.queue_stats()` function 2. `pgque.consumer_stats()` function 3. `pgque.queue_health()` diagnostic function 4. `pgque.otel_metrics()` export function 5. `pgque.throughput()`, `latency_percentiles()`, `error_rate()` historical functions 6. `pgque.in_flight()`, `pgque.stuck_consumers()` operational functions 7. Log event integration in key operations **Tests for Sprint 4:** - `pgque.queue_stats()` returns correct depth, consumer count, and DLQ count for a queue with known state - `pgque.consumer_stats()` shows correct lag and pending event count - `pgque.queue_health()` returns 'critical' for a queue with no recent ticks - `pgque.queue_health()` returns 'warning' for consumer lag > rotation_period / 2 - `pgque.queue_health()` returns 'ok' for healthy queue - `pgque.otel_metrics()` returns rows with correct metric names, types, and label structure - `pgque.stuck_consumers()` identifies consumers with lag exceeding threshold - `pgque.in_flight()` shows open batches with correct age **Line count estimate:** ~400-600 new lines. ### Sprint 5: Client Libraries — v1: Python + Go only (3-4 weeks) **Nature:** New code. Python and Go cover the two largest PostgreSQL user bases. Node.js and Ruby SDKs are deferred to v2 — shipping two solid libraries is better than four incomplete ones (per reviewer feedback and risk table). **v1 libraries:** | Library | Language | DB Driver | Estimated Size | |---|---|---|---| | `pgque-py` | Python 3.10+ | psycopg 3 | ~800-1200 lines | | `pgque-go` | Go 1.21+ | pgx/v5 | ~1000-1500 lines | **v2 libraries (deferred):** | Library | Language | DB Driver | Estimated Size | |---|---|---|---| | `pgque-js` | TypeScript/Node 18+ | pg / node-postgres | ~800-1200 lines | | `pgque-rb` | Ruby 3.1+ | pg gem | ~600-1000 lines | Each library includes: - Producer class (send, send_batch, send_at) - Consumer class (receive, ack, nack, event dispatch) - Connection management (pool, reconnect, graceful shutdown) - LISTEN/NOTIFY integration - OTel trace propagation - Tests **Tests for Sprint 5 (per library):** - Producer: send event, verify it arrives in batch after tick - Consumer: receive + ack cycle completes; unacked batch is redelivered on reconnect - Graceful shutdown: SIGTERM during batch processing finishes current batch and acks before exit - LISTEN/NOTIFY: consumer wakes within 100ms of tick (not waiting for full poll interval) - OTel: traceparent propagated from producer send to consumer receive - Connection failure: auto-reconnect with backoff; no orphaned batches - Batch transaction: crash mid-processing rolls back both application writes and ack ### Sprint 6: Testing, Benchmarks, Docs (2 weeks) **Deliverables:** 1. Full regression suite (SQL-based, PG 14-18 matrix) 2. Performance benchmarks (see SPEC.md section 10 for methodology) 3. CI pipeline (GitHub Actions, multi-PG-version) 4. README, migration guides, API reference 5. Example applications in each language **Tests for Sprint 6:** - All SQL tests pass on PG 14, 15, 16, 17, 18 - Benchmark: insert throughput >= 10k events/sec on reference hardware - Benchmark: sustained load (1 hour) shows zero dead tuple growth in event tables - Benchmark: consumer latency p50 < 3s, p99 < 5s with default tick interval - CI: GitHub Actions workflow runs full test suite on push; matrix covers all PG versions - Documentation: every public function has a docstring; README covers install, quickstart, API reference ### Total Timeline | Sprint | Duration | Dependencies | |---|---|---| | Sprint 1: Repackaging | 2 weeks | None | | Sprint 2: pg_cron | 1 week | Sprint 1 | | Sprint 3: Modern API | 2 weeks | Sprint 1 | | Sprint 4: Observability | 1 week | Sprint 1 | | Sprint 5: Client libraries | 4-6 weeks | Sprint 3 | | Sprint 6: Testing + docs | 2 weeks | All | **Critical path:** Sprint 1 -> Sprint 3 -> Sprint 5 -> Sprint 6 = ~11 weeks. Sprints 2, 3, and 4 can run in parallel after Sprint 1 completes. Sprint 5 (client libraries) can start after Sprint 3 defines the API, and the four languages can be developed simultaneously. With 2-3 engineers: **~8-10 weeks** to a complete release. --- ## 9. Team / Staffing pgque is a repackaging + extensions project, not a ground-up build. The core queue engine (snapshot isolation, batch processing, table rotation) already exists and has 15+ years of production validation. This means the team is smaller than a typical queue system build. ### 9.1 Minimum viable team: 2 people **E1: Senior PostgreSQL engineer.** Owns Sprint 1 (repackaging), Sprint 2 (pg_cron lifecycle), and Sprint 4 (observability). Must understand PgQ internals, `pg_snapshot` functions, rotation mechanics, and SECURITY DEFINER hardening. This person writes the SQL that pgque is built on. **E2: Application developer.** Owns Sprint 3 (modern API) and Sprint 5 (client libraries). Must be comfortable with PL/pgSQL and at least 2 of Python/Go/Node.js/Ruby. This person builds the developer experience layer. Sprint 6 (testing, benchmarks, docs) is shared between both engineers. ### 9.2 Week-by-week Gantt (2-person team) ``` Week E1 (PG internals) E2 (API + libraries) ──── ─────────────────────────────── ────────────────────────────── 1-2 Sprint 1: Repackaging Sprint 1: assist with test (rename, modernize, build) porting, CI setup 3 Sprint 2: pg_cron lifecycle Sprint 3: message type, (start/stop/status) send/receive/ack/nack 4 Sprint 4: queue_stats, Sprint 3: DLQ, delayed consumer_stats, health events, pause/resume 5 Sprint 4: otel_metrics, Sprint 5: pgque-py historical metrics (Python library) 6 Sprint 6: SQL regression Sprint 5: pgque-go tests, benchmarks (Go library) 7 Sprint 6: benchmark run, Sprint 5: pgque-py + pgque-go stress tests integration tests, examples 8 Sprint 6: docs, README, Sprint 6: SDK docs, example release packaging apps, migration guides ``` ### 9.3 Three-person variant Adding a third engineer (E3) compresses the timeline to ~6 weeks: - **E3: Test / DevOps engineer.** Takes over Sprint 6 duties (CI pipeline, multi-PG matrix testing, benchmark harness) starting in week 3. This frees E1 and E2 to focus on core development. E3 also writes example applications and migration guides. ``` Week E1 (PG internals) E2 (API + libraries) E3 (Test + DevOps) ──── ───────────────────── ───────────────────── ──────────────────── 1-2 Sprint 1: Repackaging Sprint 1: assist CI pipeline setup, PG matrix, fixtures 3 Sprint 2: pg_cron Sprint 3: modern API Sprint 1 regression test validation 4 Sprint 4: observability Sprint 3: DLQ, delayed Sprint 3 test suite, API integration tests 5 Sprint 4: otel metrics Sprint 5: pgque-py Benchmark harness, sustained load test 6 Docs, release review Sprint 5: pgque-go Sprint 5 library tests, examples ``` ### 9.4 Hiring considerations The hardest role to fill is E1. The required combination of PgQ internals knowledge, snapshot function expertise, and PL/pgSQL fluency is rare. If E1 is not available, the project timeline extends because Sprint 1 (the critical path foundation) cannot begin. E2 is easier to source — any experienced application developer with PL/pgSQL exposure and polyglot language skills can ramp up on the modern API layer in a few days, since it is thin wrappers over well-documented PgQ primitives. --- ## 10. Risks and Mitigations | Risk | Likelihood | Impact | Mitigation | |------|-----------|--------|------------| | PgQ PL/pgSQL code has undiscovered bugs in edge cases | Low | High | PgQ has 15+ years of production use. Run full PgQ test suite against pgque in Sprint 1. Focus testing on snapshot boundary conditions and cross-rotation batches. | | Schema rename introduces subtle breakage | Medium | Medium | Automated rename with `sed` + comprehensive regression tests. Each function tested individually. Grep for any remaining `pgq.` references in output. | | `pg_snapshot` function behavior differs from `txid_*` | Very Low | High | Functions are documented aliases since PG13. Test with PG14+ specifically. Compare `pg_current_snapshot()` output with `txid_current_snapshot()` on PG13 (where both exist) to validate equivalence. | | Modern API adds unexpected overhead vs raw PgQ API | Low | Low | Modern API is thin wrappers (1-2 SQL calls each). Benchmark both `pgque.send()` and `pgque.insert_event()` paths. Overhead should be <5% per call. | | Client libraries fragment maintenance effort | Medium | Medium | Start with Python + Go only. Add Node.js + Ruby based on demand. Each library is <1500 lines and shares the same architecture, so maintenance burden is proportional and predictable. | | Tick-based latency (1-2s) deters real-time use cases | Medium | Low | Document clearly in section 2.7 (When NOT to use pgque). LISTEN/NOTIFY reduces to ~100ms. For sub-10ms, recommend graphile-worker or direct LISTEN/NOTIFY. | | Long-lived transactions degrade rotation and cleanup | Medium | Medium | `pgque.queue_health()` alerts when consumer lag exceeds rotation_period / 2. Documentation includes max lag guidance. Operational runbook defines escalation procedures. Consider `queue_max_consumer_lag` config parameter that triggers warnings. | | pg_cron not available on a target provider | Low | High | Ticker and maint work from any scheduler. Document cron, systemd timer, `\watch`, and application-loop alternatives. Test all paths. | | pg_cron 1-second minimum granularity insufficient | Low | Low | PgQ's `pgqd` default was 1-2 seconds anyway. Sub-second ticking has diminishing returns. Document this. | | Old-style `INHERITS` deprecated in future PG | Very Low | High | INHERITS has been stable for 20+ years. Native partitioning is available but has different semantics for the rotation pattern (see SPEC.md 3.2.5). Monitor PG development. Migration path exists if needed. | | `retry_queue` bloats under high-retry workloads with MVCC horizon pinning | Low | Low | Add `VACUUM pgque.retry_queue` to the `maint()` cycle. Monitor in benchmarks. Retry queue is small relative to event tables. | --- ## 11. Best Practices ### 11.1 Producers - **Prefer `pgque.send()` for simplicity, `pgque.insert_event()` for control.** `send()` is the right choice for 80% of use cases (JSON payload, auto type). Use `insert_event()` when you need to set `ev_extra1..4`, use non-JSON payloads, or need maximum insert performance (one fewer function call). - **Batch sends in transactions for throughput.** Group many `send()` or `insert_event()` calls in a single transaction (e.g., 100-1000 per COMMIT). This amortizes transaction overhead and is expected to achieve high throughput. - **Use `ev_type` for routing, keep `ev_data` compact.** Consumers dispatch on `ev_type`. A well-chosen type (e.g., `order.created`, `user.updated`) lets a single queue serve multiple event kinds. Store references (IDs, keys) in `ev_data` rather than full payloads when possible. - **Don't insert events in long-running transactions.** Events become visible only after COMMIT. A transaction that runs for 5 minutes and inserts events means those events are invisible for 5 minutes — and the ticker cannot advance past them. - **Prefer direct API over triggers for high-throughput queues.** `pgque.insert_event()` is faster than CDC triggers because it skips column introspection, serialization, and dynamic SQL. Use triggers for CDC (capture all changes); use the API when the application knows exactly what event to produce. ### 11.2 Consumers - **Always ack batches.** An unfinished batch blocks the consumer's position. Call `pgque.ack(batch_id)` after processing. If you crash before acking, the same batch is redelivered on next `pgque.receive()` — this is at-least-once delivery. Exactly-once processing requires idempotent consumers. - **Make handlers idempotent.** If a consumer crashes after processing some events but before `ack()`, the entire batch is redelivered. Design handlers so that reprocessing an event is harmless (e.g., use upserts, check idempotency keys). - **Use `pgque.nack()` for transient failures, let DLQ handle permanent ones.** Don't fail the entire batch because one event has a temporary problem. Nack that event (it retries with delay) and ack the batch. After max retries, `nack()` automatically moves the event to the dead letter queue. - **Use LISTEN for low-latency wakeup with polling fallback.** Instead of polling `receive()` in a tight sleep loop, `LISTEN pgque_` and wake on notification. Always combine with a polling fallback (e.g., poll every 30s in case a notification was missed). LISTEN requires session-mode connection pooling. - **Monitor consumer lag via `pgque.queue_health()`.** A consumer falling behind blocks table rotation. Set up alerts for lag exceeding half the rotation period. - **Use `next_batch_custom()` for batching control.** `min_count` and `min_interval` let you batch small trickles into larger units of work, reducing per-batch overhead. ### 11.3 Queue Design - **One queue per event stream, not one queue per event type.** Put related events (`order.created`, `order.updated`, `order.cancelled`) in the same queue. Use `ev_type` to distinguish. This gives consumers a consistent, ordered view of the stream. - **Multiple consumers for fan-out.** Each registered consumer gets its own independent position. Use this for fan-out: one queue, multiple consumers (audit logger, notification sender, analytics pipeline), each processing at their own pace. - **Don't create too many queues.** Each queue has 3 event tables, 2 sequences, and tick history. The ticker iterates all queues every 1-2 seconds. 100+ queues is fine; 10,000 queues will slow the ticker. ### 11.4 Operations - **Let pg_cron handle the ticker.** Don't run external ticker scripts alongside pg_cron — they will contend over tick generation and produce duplicate ticks. - **Monitor `pgque.queue_health()`.** It catches ticker stalls, consumer lag, rotation issues, and DLQ growth in one call. Wire it into your alerting system. - **Set rotation period > max consumer lag.** If your slowest consumer is 1 hour behind, rotation period must be > 1 hour. Otherwise rotation is blocked indefinitely and you lose pgque's zero-bloat advantage. - **Event tables are transient.** Don't rely on pgque for long-term event storage. Consume events and write to a permanent destination (data warehouse, object storage, etc.). TRUNCATE rotation means events are gone after the rotation period. - **TRUNCATE is instant.** Don't worry about event table size during a rotation period. 10 million events in a table? TRUNCATE takes <1ms regardless of table size. --- ## 12. Relationship to SPEC.md SPEC.md (v0.7.0-draft, 1616 lines) was written for the "pgque reimplementation" approach. It contains deep technical analysis that remains fully relevant to pgque: | SPEC.md Section | Topic | Relevance to pgque | |---|---|---| | 3.2.5 | INHERITS justification for rotation | Directly applicable -- pgque uses same mechanism | | 3.2.6 | Snapshot-based batch isolation, dual-filter algorithm | Core of pgque, unchanged | | 3.2.6 | Subtransaction caveats | Same caveats apply | | 3.2.7 | SECURITY DEFINER hardening rules | Applied in pgque Sprint 1 | | 3.3 | C-to-PL/pgSQL replacement analysis | Documents exactly what pgque inherits | | 3.4 | Performance expectations | Same numbers apply | | 4.3 | pg_cron integration design | pgque uses this design | | 4.4 | Rotation state machine with recovery rules | Core of pgque maintenance | | 4.4.1 | Tick cleanup invariant | Applied unchanged | | 7 | Risk table | Applicable risks carried forward | | 8 | Migration paths (PGMQ, River, pg-boss, etc.) | Directly applicable with schema `pgque` | | 9 | Best practices | Applicable with schema rename | | 10 | Benchmark methodology (incl. Brandur/PlanetScale MVCC analysis) | pgque benchmark plan | **What changes:** - SPEC.md section 1 describes a "reimplementation" -- pgque is a repackaging - SPEC.md section 5 (implementation plan) is replaced by this document's Sprint 1-6 - SPEC.md section 6 (staffing) needs revision (less work = smaller team) - SPEC.md section 11 (future work) -- several items are now in pgque v1 scope (DLQ, delayed events, metrics views) SPEC.md should be preserved alongside SPECx.md as the reference for PgQ's internal architecture. --- ## 13. Testing and Admin CLI ### 13.1 SQL Test Suite pgque ships a comprehensive SQL regression test suite modeled on PgQ's existing `sql/` + `expected/` structure. **Test categories:** | Category | Tests | |---|---| | Core lifecycle | `create_queue`, `drop_queue`, `set_queue_config` | | Event insertion | `insert_event`, `insert_event_raw`, `send`, `send_batch` | | Ticker | Adaptive frequency, multi-queue, paused queues | | Batch processing | `next_batch`, `get_batch_events`, `finish_batch`, cursor-based | | Snapshot correctness | In-flight TX visibility, dual filter, cross-rotation batches | | Rotation | step1/step2, blocked by slow consumer, concurrent inserts | | Retry | `event_retry`, `maint_retry_events`, retry count tracking | | DLQ | `nack` with max retries, `dlq_inspect`, `dlq_replay`, `dlq_purge` | | Modern API | `send/receive/ack/nack`, delayed delivery, priority | | Permissions | `pgque_reader`, `pgque_writer`, `pgque_admin` role enforcement | | pg_cron integration | `start/stop/status`, idempotent start | | Observability | `queue_stats`, `consumer_stats`, `queue_health`, `otel_metrics` | | Triggers | `jsontriga` (INSERT/UPDATE/DELETE, pkey detection, ignore, backup) | | Multi-PG version | PG 14, 15, 16, 17, 18 | **Testing utilities** (available in pgque for user test suites): ```sql -- Insert a test event and verify it arrives pgque.test_send(queue text, payload jsonb) RETURNS bigint -- event_id -- Consume one event and verify content pgque.test_consume(queue text, consumer text, expected_type text) RETURNS pgque.message -- Assert queue is empty (no pending events for any consumer) pgque.assert_empty(queue text) RETURNS boolean -- raises exception if not empty -- Assert DLQ is empty for a queue pgque.assert_dlq_empty(queue text) RETURNS boolean -- raises exception if not empty ``` ### 13.2 Test Methodology: Red/Green TDD All new pgque code (pgque-api layer, observability, client libraries) must be developed using **red/green TDD** where it makes sense: 1. **Red:** Write a failing test that defines the expected behavior 2. **Green:** Write the minimum code to make the test pass 3. **Refactor:** Clean up without changing behavior; tests stay green **Where TDD applies:** - All modern API functions (`send`, `receive`, `ack`, `nack`, DLQ, delayed) - Observability functions (`queue_stats`, `consumer_stats`, `queue_health`) - Client library producer/consumer classes - CLI commands - The `build/transform.sh` pipeline (test that output SQL is valid) **Where TDD does not apply:** - pgque-core repackaging (Sprint 1) — PgQ already has tests; we run them after transformation and verify they pass. The tests exist before the code. - Exploratory benchmarks — these inform design, not verify correctness. **Test-first discipline in SQL:** ```sql -- Red: test that nack() moves event to DLQ after max retries -- (write this BEFORE implementing nack) do $$ declare v_msg pgque.message; v_dlq_count bigint; begin -- Setup: queue with max_retries=2 perform pgque.create_queue('test_dlq'); perform pgque.set_queue_config('test_dlq', 'max_retries', '2'); perform pgque.subscribe('test_dlq', 'c1'); perform pgque.send('test_dlq', '{"x":1}'::jsonb); perform pgque.ticker(); -- Simulate 2 prior retries (retry_count=2 >= max_retries=2) select * into v_msg from pgque.receive('test_dlq', 'c1', 1); -- Forge retry_count to simulate prior retries v_msg.retry_count := 2; perform pgque.nack(v_msg.batch_id, v_msg, '1 second', 'test failure'); perform pgque.ack(v_msg.batch_id); -- Assert: event is in DLQ select count(*) into v_dlq_count from pgque.dead_letter where dl_queue_id = (select queue_id from pgque.queue where queue_name = 'test_dlq'); assert v_dlq_count = 1, 'expected 1 DLQ entry, got ' || v_dlq_count; -- Cleanup perform pgque.unsubscribe('test_dlq', 'c1'); perform pgque.drop_queue('test_dlq'); end; $$; ``` **Unit tests vs. integration tests:** The example above is a **unit test** — it forges `retry_count` to isolate `nack()`'s DLQ routing logic. This proves the conditional works but does not test that `ev_retry` actually increments through the real retry flow (`event_retry_raw` → `maint_retry_events` → next batch delivery). Both are needed: - **Unit test:** forge state, test one function's logic (fast, isolated) - **Integration test:** full retry cycle (nack → ack → maint → ticker → receive), verify `retry_count` increments naturally (slower, end-to-end) Write the unit test first (TDD red/green). Then write the integration test as an acceptance test (section 13.3, US-3). Both must pass. ### 13.3 User Stories and Acceptance Tests These are end-to-end scenarios that verify pgque works as a complete system. They serve as both **CI acceptance tests** (automated) and **manual verification paths** for humans or AI agents testing a fresh deployment. Each story follows a consistent structure: setup, action, verify, teardown. #### US-1: Basic produce/consume cycle **As a** developer, **I want to** send a JSON message and receive it, **so that** I can use pgque as a simple queue. ``` Setup: install pgque, create queue "orders", subscribe consumer "app" Action: send('orders', '{"id":1}'), ticker(), receive('orders','app',10) Verify: exactly 1 message returned, payload = '{"id":1}', type = 'default' ack(batch_id) succeeds subsequent receive() returns empty set Teardown: drop queue, uninstall ``` #### US-2: Multiple consumers (fan-out) **As a** platform team, **I want** multiple independent consumers on one queue, **so that** analytics, notifications, and audit each process the same events at their own pace. ``` Setup: create queue "events", subscribe "analytics", "notifier", "audit" Action: send 5 events, ticker() Verify: each consumer receives all 5 events independently acking one consumer does not affect the others consumer_stats() shows correct per-consumer lag ``` #### US-3: Retry and DLQ flow **As a** developer, **I want** failed messages to retry automatically and land in a dead letter queue after max retries, **so that** transient failures are handled without manual intervention. ``` Setup: create queue "jobs" with max_retries=2, subscribe "worker" Action: send event, ticker(), receive (retry_count=NULL, coalesced to 0) -- Retry cycle (each nack requires: nack → ack → maint → ticker → receive) Cycle 1: nack(msg) → ack(batch) → maint() → ticker() → receive (retry_count=1, event_retry_raw incremented it) Cycle 2: nack(msg) → ack(batch) → maint() → ticker() → receive (retry_count=2, now >= max_retries) Cycle 3: nack(msg) → retry_count=2 >= max_retries=2 → DLQ ack(batch) Verify: event is in dead_letter table (not retried again) dlq_inspect() shows the event with reason dlq_replay() re-inserts it into the queue ticker(), receive() gets the replayed event (retry_count reset) ``` #### US-4: Delayed delivery **As a** developer, **I want to** schedule a message for future delivery, **so that** I can implement reminders and scheduled tasks. ``` Setup: create queue "reminders", subscribe "sender" Action: send_at('reminders', 'remind', payload, now() + '5 seconds') Verify: receive() returns empty immediately wait 5+ seconds, call maint() (which runs maint_deliver_delayed) ticker() receive() now returns the event ``` #### US-5: Batch processing under load **As an** ETL pipeline, **I want to** process thousands of events per batch efficiently, **so that** I can keep up with high-throughput producers. ``` Setup: create queue "ingest", subscribe "etl" Action: insert 10,000 events in a single transaction, ticker() receive('ingest', 'etl', 10000) Verify: all 10,000 events returned in one batch ack completes successfully queue_stats() shows depth=0 no dead tuples in event tables (check pg_stat_user_tables) ``` #### US-6: Graceful rotation under consumer lag **As an** operator, **I want** table rotation to work correctly even when a slow consumer is lagging, **so that** the system doesn't lose events. ``` Setup: create queue "stream" with rotation_period='10 seconds' subscribe "fast" and "slow" Action: send events, ticker() repeatedly over 30+ seconds "fast" consumer: receive+ack every 2 seconds "slow" consumer: do not consume at all Verify: queue_health() shows 'warning' or 'critical' for "slow" rotation is blocked (cannot TRUNCATE tables "slow" reads from) "fast" consumer continues to receive normally once "slow" catches up and acks, rotation resumes ``` #### US-7: Transactional exactly-once processing **As a** developer, **I want** my application writes and ack to be in the same transaction, **so that** a crash leaves the system consistent. ``` Setup: create queue "payments", subscribe "processor" create table processed_payments (id int primary key) Action: send event with payment_id=42, ticker() BEGIN; receive(); INSERT INTO processed_payments; ack(); COMMIT; Verify: processed_payments contains payment_id=42 receive() returns empty (batch finished) -- Crash simulation: send event with payment_id=43, ticker() BEGIN; receive(); INSERT INTO processed_payments; -- NO COMMIT disconnect (simulates crash) reconnect, receive() returns payment_id=43 again (redelivered) ``` #### US-8: Install on managed PostgreSQL **As a** developer on RDS/Cloud SQL/Supabase, **I want to** install pgque with a single SQL file and start it with pg_cron, **so that** I don't need DBA access or custom extensions. ``` Setup: fresh managed PG database with pg_cron enabled Action: \i pgque.sql select pgque.start() Verify: pgque.status() shows ticker and maint running create queue, send, ticker, receive, ack all work pgque.queue_health() returns 'ok' for all checks pgque.stop() removes pg_cron jobs pgque.uninstall() removes all objects cleanly ``` #### US-9: Observability and health monitoring **As an** operator, **I want** to quickly diagnose queue health and consumer lag, **so that** I can set up alerting and respond to issues. ``` Setup: create 3 queues with varying load patterns one queue healthy, one with lagging consumer, one with DLQ entries Action: query queue_stats(), consumer_stats(), queue_health() Verify: queue_stats() shows correct depth, throughput, DLQ count consumer_stats() shows correct lag per consumer queue_health() returns 'ok', 'warning', 'critical' appropriately otel_metrics() returns rows with correct metric names and types stuck_consumers() identifies the lagging consumer ``` #### US-10: Idempotent install and upgrade **As an** operator, **I want to** safely re-run the install script, **so that** upgrades and accidental re-runs don't break anything. ``` Setup: install pgque, create queues, insert events, subscribe consumers note current queue depth and consumer positions Action: run \i pgque.sql again Verify: no errors existing queues and events are preserved (check depth matches) consumer positions are preserved (sub_last_tick unchanged) all functions work correctly after re-install send + ticker + receive + ack cycle works ``` **Implementation note:** This is the hardest test to get right. PgQ's original source uses plain `CREATE TABLE` / `CREATE FUNCTION` without idempotency guards. `build/transform.sh` must ensure the install script uses `CREATE TABLE IF NOT EXISTS`, `CREATE OR REPLACE FUNCTION`, `CREATE TYPE ... IF NOT EXISTS` (or `DO $$ BEGIN ... EXCEPTION WHEN duplicate_object ...`), and `CREATE SEQUENCE IF NOT EXISTS`. The test must verify not just "no errors" but "data survives" — specifically that queue contents, consumer positions, tick history, config, and DLQ entries are all preserved across re-install. This is a Sprint 1 deliverable that deserves dedicated test coverage beyond this user story. #### Running acceptance tests **Automated (CI):** Each user story maps to a SQL test file in `tests/acceptance/`. CI runs them on PG 14-18 matrix. Tests are self-contained: setup, action, verify, teardown in a single file. **Manual / AI agent verification:** The stories above are written so that a human or AI agent can execute them step-by-step against a live database using `psql`. The setup/action/verify/teardown structure makes each story independently executable and verifiable without special tooling. ### 13.4 Admin CLI The `pgque` CLI is a thin wrapper around SQL calls, designed for operators and CI/CD pipelines. Written in Go (single binary, no dependencies). ``` pgque - PgQ Extended administration tool Usage: pgque [flags] Connection: --dsn, -d PostgreSQL connection string (or PGQUE_DSN env var) --database Database name (or PGDATABASE) Commands: install Install pgque schema into database upgrade Upgrade pgque to latest version uninstall Remove pgque schema (requires --force) start Start pg_cron ticker and maintenance jobs stop Stop pg_cron jobs status Show system health, queue stats, consumer lag queues List all queues with depth and health consumers List all consumers with lag and status depth Show queue depth over time (sparkline) drain Wait until queue is empty or timeout replay-dlq Replay dead letter events back into queue purge-dlq Delete old dead letter events create-queue Create a new queue drop-queue Drop a queue (requires --force if consumers exist) pause Pause a queue's ticker resume Resume a queue's ticker Examples: pgque install -d "postgresql://localhost/mydb" pgque start pgque status pgque queues pgque depth orders --watch 5s pgque consumers orders pgque replay-dlq orders --limit 100 pgque drain orders --timeout 60s ``` **`pgque status` output example:** ``` pgque v1.0.0 | PostgreSQL 16.2 | pg_cron 1.6 System: Ticker: running (job 42, every 2s, last run 1.2s ago) Maint: running (job 43, every 30s, last run 12s ago) Queues: NAME DEPTH RATE ROTATION CONSUMERS DLQ HEALTH orders 42 150/s 1h23m ago 3 0 ok notifications 831 45/s 0h47m ago 1 12 warning audit 0 0/s 1h59m ago 2 0 ok Consumers: QUEUE CONSUMER LAG PENDING STATUS orders order_processor 2.1s 42 ok orders analytics 15.3s 412 ok orders notifications 1m12s 1831 warning notifications email_sender 3m45s 831 warning audit compliance_log 0.5s 0 ok ``` --- ## 14. Migration Paths SPEC.md section 8 contains detailed migration tables for each alternative queue system. The mappings apply to pgque with these adjustments: | SPEC.md reference | pgque equivalent | |---|---| | `pgque.insert_event(queue, type, data)` | `pgque.send(queue, type, payload)` or `pgque.insert_event(queue, type, data)` | | `pgque.next_batch()` + `get_batch_events()` | `pgque.receive(queue, consumer, batch_size)` | | `pgque.finish_batch()` | `pgque.ack(batch_id)` | | `pgque.event_retry()` | `pgque.nack(batch_id, msg_id, delay)` | | Retry queue with max-retry logic in consumer | `pgque.nack()` handles DLQ automatically | | Schema `pgque` | Schema `pgque` | ### Quick migration reference **From PgQ:** Schema rename (`pgq` -> `pgque`), remove pgqd, call `pgque.start()`. Consumer code structure is identical. See SPEC.md 8.1. **From PGMQ:** Switch from per-message (`pgmq.read` + `pgmq.delete`) to batch model (`pgque.receive` + `pgque.ack`). The modern API makes this close to 1:1. See SPEC.md 8.2. **From River / pg-boss / graphile-worker / Oban:** Switch from callback-per-job to batch processing. pgque client libraries provide typed dispatch that feels similar (`consumer.on("type", handler)`). The main conceptual shift is batch-oriented ack. See SPEC.md 8.3-8.6. **From DIY SKIP LOCKED:** The strongest migration case. pgque eliminates the MVCC dead tuple failure mode entirely (see SPEC.md 10.2 for the Brandur/PlanetScale analysis). See SPEC.md 8.7. --- ## Appendix A: PostgreSQL Version Support | Version | Status | Notes | |---------|--------|-------| | PG 14 | Supported | Minimum. `pg_snapshot` functions available since PG13. | | PG 15 | Supported | | | PG 16 | Supported | | | PG 17 | Supported | | | PG 18 | Supported | | ## Appendix B: pg_cron Availability | Provider | pg_cron | Notes | |----------|---------|-------| | Amazon RDS / Aurora | Yes | Since PG 12.5 | | Google Cloud SQL | Yes | Requires flag | | AlloyDB | Yes | Supported | | Azure Flexible Server | Yes | Constrained permissions | | Supabase | Yes | Pre-installed | | Neon | Yes | Jobs only run when compute active | | Crunchy Bridge | Yes | Supported | | Self-hosted | Yes | Install separately | Without pg_cron, pgque is fully functional -- the ticker and maintenance must be called from an external scheduler. See SPEC.md 4.3. ## Appendix C: Source File Inventory PgQ PL-only source files that pgque repackages: | File | Lines | Purpose | |---|---|---| | `structure/tables.sql` | 225 | Schema (all tables, sequences, constraints) | | `lowlevel_pl/insert_event.sql` | 60 | PL/pgSQL event insertion (replaces C) | | `lowlevel_pl/jsontriga.sql` | 318 | JSON CDC trigger (replaces C) | | `lowlevel_pl/logutriga.sql` | 326 | URL-encoded CDC trigger (replaces C) | | `lowlevel_pl/sqltriga.sql` | 363 | SQL fragment CDC trigger (replaces C) | | `functions/pgq.ticker.sql` | 165 | Ticker with adaptive frequency | | `functions/pgq.batch_event_sql.sql` | 133 | Snapshot-based batch query builder | | `functions/pgq.batch_event_tables.sql` | 67 | Determines tables for a batch | | `functions/pgq.create_queue.sql` | 81 | Queue creation | | `functions/pgq.drop_queue.sql` | 82 | Queue deletion | | `functions/pgq.register_consumer.sql` | 129 | Consumer registration | | `functions/pgq.unregister_consumer.sql` | 78 | Consumer unregistration | | `functions/pgq.next_batch.sql` | 219 | Batch acquisition (3 variants) | | `functions/pgq.get_batch_events.sql` | 39 | Batch event retrieval | | `functions/pgq.get_batch_cursor.sql` | 117 | Cursor-based batch retrieval | | `functions/pgq.finish_batch.sql` | 36 | Batch completion | | `functions/pgq.event_retry.sql` | 78 | Event retry | | `functions/pgq.event_retry_raw.sql` | 67 | Low-level retry insertion | | `functions/pgq.batch_retry.sql` | 53 | Batch-level retry | | `functions/pgq.maint_rotate_tables.sql` | 119 | Two-phase table rotation | | `functions/pgq.maint_retry_events.sql` | 45 | Retry event re-insertion | | `functions/pgq.maint_operations.sql` | 129 | Maintenance orchestration | | `functions/pgq.maint_tables_to_vacuum.sql` | 57 | Vacuum scheduling | | `functions/pgq.set_queue_config.sql` | 59 | Runtime config changes | | `functions/pgq.get_queue_info.sql` | 141 | Queue info | | `functions/pgq.get_consumer_info.sql` | 135 | Consumer info | | `functions/pgq.get_batch_info.sql` | 53 | Batch info | | `functions/pgq.grant_perms.sql` | 99 | Permission management | | `functions/pgq.tune_storage.sql` | 48 | Storage parameter tuning | | `functions/pgq.force_tick.sql` | 49 | Force immediate tick | | `functions/pgq.find_tick_helper.sql` | 78 | Custom tick finding | | `functions/pgq.seq_funcs.sql` | 65 | Sequence utilities | | `functions/pgq.quote_fqname.sql` | 36 | Name quoting | | `functions/pgq.current_event_table.sql` | 44 | Current table lookup | | `functions/pgq.upgrade_schema.sql` | 49 | Schema migration | | `functions/pgq.version.sql` | 16 | Version string | | `structure/grants.sql` | 13 | Default grants | | **Total** | **4,028** | | New code added by pgque (estimated): | Component | Estimated Lines | |---|---| | Lifecycle (start/stop/status/uninstall) | 200-300 | | Modern API (send/receive/ack/nack/subscribe) | 300-400 | | DLQ (table + functions) | 150-200 | | Delayed events (table + functions) | 100-150 | | Observability (stats/health/otel) | 400-600 | | Testing utilities | 100-150 | | **Total new PL/pgSQL** | **~1,250-1,800** |