# Hoppity — LLM Code Generation Reference Contract-driven RabbitMQ topology builder for Node.js microservices, built on Rascal. Handlers and publish declarations ARE the topology. You declare what your service handles and what it sends. Hoppity derives all exchanges, queues, bindings, publications, and subscriptions automatically. No topology files. No manual Rascal config. ## Package Overview | Package | npm Name | Version | Status | Dependencies | |---------|----------|---------|--------|--------------| | hoppity | `@apogeelabs/hoppity` | 1.0.0 | Stable | `rascal@^20.1.1`, `fast-deep-equal@^3.1.3`, `zod@^3.x` | | hoppity-open-telemetry | `@apogeelabs/hoppity-open-telemetry` | 0.1.0 | Stable | `@apogeelabs/hoppity` (workspace), `@opentelemetry/api@^1.9.0` (peer) | All packages require Node >= 22. All produce CJS, ESM, and type declarations. ## Import Paths Everything comes from `@apogeelabs/hoppity`. There are no sub-module imports. ```typescript import hoppity, { defineDomain, onEvent, onCommand, onRpc, } from "@apogeelabs/hoppity"; import type { ServiceBroker, ServiceConfig, ConnectionConfig, EventContract, CommandContract, RpcContract, DomainDefinition, HandlerDeclaration, HandlerOptions, MiddlewareFunction, MiddlewareResult, MiddlewareContext, BrokerCreatedCallback, Logger, RpcRequest, RpcResponse, RpcErrorCodeValue, Interceptor, InboundWrapper, OutboundWrapper, InboundMetadata, OutboundMetadata, } from "@apogeelabs/hoppity"; import { RpcErrorCode, RpcError, ConsoleLogger, defaultLogger } from "@apogeelabs/hoppity"; ``` ## Type Signatures ### Core Types (`@apogeelabs/hoppity`) #### Logger ```typescript interface Logger { silly(message: string, ...args: any[]): void; debug(message: string, ...args: any[]): void; info(message: string, ...args: any[]): void; warn(message: string, ...args: any[]): void; error(message: string, ...args: any[]): void; critical(message: string, ...args: any[]): void; } ``` #### MiddlewareContext ```typescript interface MiddlewareContext { data: Record; // Mutable shared state between middleware middlewareNames: string[]; // Names of middleware that have already executed logger: Logger; // Logger instance (set by ServiceConfig.logger or defaultLogger) serviceName?: string; // Populated by ServiceBuilder from hoppity.service(name, ...) } ``` #### MiddlewareFunction / MiddlewareResult / BrokerCreatedCallback ```typescript type MiddlewareFunction = ( topology: BrokerConfig, context: MiddlewareContext ) => MiddlewareResult; interface MiddlewareResult { topology: BrokerConfig; onBrokerCreated?: BrokerCreatedCallback; } type BrokerCreatedCallback = (broker: BrokerAsPromised) => void | Promise; ``` #### BrokerWithExtensions Utility type for combining a broker with middleware extension methods (e.g., `delayedPublish`): ```typescript type BrokerWithExtensions[]> = BrokerAsPromised & UnionToIntersection; // Usage: type MyBroker = BrokerWithExtensions<[{ customMethod(): void }]>; ``` --- ### Contract Types #### EventContract ```typescript interface EventContract< TDomain extends string = string, TName extends string = string, TSchema extends ZodTypeAny = ZodTypeAny, > { _type: "event"; _domain: TDomain; _name: TName; schema: TSchema; exchange: string; // "{domain}" — shared topic exchange for events + commands routingKey: string; // "{domain}.event.{snake_name}" publicationName: string; // "{domain}_event_{snake_name}" subscriptionName: string; // "{domain}_event_{snake_name}" } ``` #### CommandContract ```typescript interface CommandContract< TDomain extends string = string, TName extends string = string, TSchema extends ZodTypeAny = ZodTypeAny, > { _type: "command"; _domain: TDomain; _name: TName; schema: TSchema; exchange: string; // "{domain}" — shared with events routingKey: string; // "{domain}.command.{snake_name}" publicationName: string; // "{domain}_command_{snake_name}" subscriptionName: string; // "{domain}_command_{snake_name}" } ``` #### RpcContract ```typescript interface RpcContract< TDomain extends string = string, TName extends string = string, TRequest extends ZodTypeAny = ZodTypeAny, TResponse extends ZodTypeAny = ZodTypeAny, > { _type: "rpc"; _domain: TDomain; _name: TName; requestSchema: TRequest; responseSchema: TResponse; exchange: string; // "{domain}_rpc" — separate exchange for RPC routingKey: string; // "{domain}.rpc.{snake_name}" publicationName: string; // "{domain}_rpc_{snake_name}" subscriptionName: string; // "{domain}_rpc_{snake_name}" } ``` #### DomainDefinition ```typescript interface DomainDefinition< TDomain extends string = string, TEvents extends EventsDefinition = EventsDefinition, TCommands extends CommandsDefinition = CommandsDefinition, TRpc extends RpcDefinition = RpcDefinition, > { domain: TDomain; events: { [K in keyof TEvents]: EventContract }; commands: { [K in keyof TCommands]: CommandContract }; rpc: { [K in keyof TRpc]: RpcContract }; } ``` #### DomainDefinitionInput ```typescript interface DomainDefinitionInput< TEvents extends EventsDefinition = EventsDefinition, TCommands extends CommandsDefinition = CommandsDefinition, TRpc extends RpcDefinition = RpcDefinition, > { events?: TEvents; commands?: TCommands; rpc?: TRpc; } // Each operation accepts a bare Zod schema OR an extended object: type EventOperationInput = ZodTypeAny | { schema: ZodTypeAny; [key: string]: any }; type CommandOperationInput = ZodTypeAny | { schema: ZodTypeAny; [key: string]: any }; type RpcOperationInput = | { request: ZodTypeAny; response: ZodTypeAny } | { schema: { request: ZodTypeAny; response: ZodTypeAny }; [key: string]: any }; ``` #### HandlerOptions ```typescript interface HandlerOptions { queueType?: "quorum" | "classic"; // defaults to "quorum" redeliveries?: { limit: number }; // defaults to { limit: 5 } deadLetter?: { exchange: string; routingKey?: string; }; } ``` --- ### Handler Types #### Handler Function Signatures ```typescript // Content type is inferred from the contract's schema via z.infer type EventHandler = ( content: z.infer, context: HandlerContext ) => Promise | void; type CommandHandler = ( content: z.infer, context: HandlerContext ) => Promise | void; // RPC handlers must be async and return the response type type RpcHandler = ( request: z.infer, context: HandlerContext ) => Promise>; ``` #### HandlerContext ```typescript interface HandlerContext { broker: HandlerContextBroker; // Typed broker for outbound operations within handlers } // HandlerContextBroker includes: publishEvent, sendCommand, request, cancelRequest // (same signatures as ServiceBroker — see below) ``` #### HandlerDeclaration (union) ```typescript interface EventHandlerDeclaration { _kind: "event"; contract: EventContract; handler: EventHandler; options?: HandlerOptions; } interface CommandHandlerDeclaration { _kind: "command"; contract: CommandContract; handler: CommandHandler; options?: HandlerOptions; } interface RpcHandlerDeclaration { _kind: "rpc"; contract: RpcContract; handler: RpcHandler; options?: HandlerOptions; } type HandlerDeclaration = | EventHandlerDeclaration | CommandHandlerDeclaration | RpcHandlerDeclaration; ``` --- ### Service Configuration #### ConnectionConfig ```typescript interface ConnectionConfig { url: string; // e.g., "amqp://localhost" or "amqp://user:pass@host:5672" vhost?: string; // defaults to "/" options?: Record; // e.g., { heartbeat: 10 } retry?: { factor?: number; // backoff multiplier min?: number; // minimum retry delay in ms max?: number; // maximum retry delay in ms }; } ``` #### ServiceConfig ```typescript interface ServiceConfig { connection: ConnectionConfig; handlers?: HandlerDeclaration[]; // defaults to [] publishes?: (EventContract | CommandContract | RpcContract)[]; // defaults to [] topology?: BrokerConfig; // Optional raw Rascal config — merged as base before derived topology instanceId?: string; // Auto-generated UUID if not provided defaultTimeout?: number; // RPC timeout in ms (defaults to 30_000) validateInbound?: boolean; // Validate incoming payloads against schemas (defaults to true) validateOutbound?: boolean; // Validate outgoing payloads against schemas (defaults to false) interceptors?: Interceptor[]; // Per-message wrappers — tracing, metrics, header injection logger?: Logger; // Custom logger — replaces defaultLogger for the entire build pipeline delayedDelivery?: { maxRetries?: number; // Max re-publish attempts before error queue (default 5) retryDelay?: number; // ms between retry attempts (default 1000) }; } ``` --- ### ServiceBroker Returned by `ServiceBuilder.build()`. Extends `BrokerAsPromised` with typed outbound methods. ```typescript interface ServiceBroker extends BrokerAsPromised { publishEvent( contract: EventContract, message: z.infer, overrides?: PublicationConfig & { delay?: number | true } ): Promise; sendCommand( contract: CommandContract, message: z.infer, overrides?: PublicationConfig & { delay?: number | true } ): Promise; request( contract: RpcContract, message: z.infer, overrides?: PublicationConfig ): Promise>; cancelRequest(correlationId: string): boolean; } ``` --- ### RPC Wire Types These describe the envelope formats used on the wire. Most callers don't interact with them directly — they're relevant when building custom RPC infrastructure or debugging. ```typescript interface RpcRequest { correlationId: string; rpcName: string; // "{domain}.{operationName}" payload: any; replyTo: string; // reply queue name for this instance headers?: Record; } interface RpcResponse { correlationId: string; payload?: any; error?: { code: string; message: string; details?: any; }; } const RpcErrorCode = { HANDLER_ERROR: "RPC_HANDLER_ERROR", TIMEOUT: "RPC_TIMEOUT", CANCELLED: "RPC_CANCELLED", } as const; type RpcErrorCodeValue = "RPC_HANDLER_ERROR" | "RPC_TIMEOUT" | "RPC_CANCELLED"; class RpcError extends Error { readonly code: RpcErrorCodeValue | string; constructor(message: string, code: RpcErrorCodeValue | string); } ``` --- ### Delayed Delivery Delayed delivery is built into core. Declare `delay` on contracts in `defineDomain`, then pass `{ delay: N }` in publish overrides. #### Declaring delay support on contracts ```typescript const OrdersDomain = defineDomain("orders", { events: { orderCreated: { schema: z.object({ orderId: z.string() }), delay: true, // enables delay with no default }, reminderSent: { schema: z.object({ orderId: z.string() }), delay: { default: 60000 }, // enables delay with 60s default }, }, }); ``` #### Publishing with delay ```typescript // Use the contract's default delay await broker.publishEvent(OrdersDomain.events.reminderSent, { orderId }, { delay: true }); // Override with a specific delay in ms await broker.publishEvent(OrdersDomain.events.reminderSent, { orderId }, { delay: 30000 }); ``` #### Configuring retry behavior ```typescript const broker = await hoppity .service("order-service", { connection: { url: "amqp://localhost" }, handlers: [onEvent(OrdersDomain.events.reminderSent, handleReminder)], publishes: [OrdersDomain.events.reminderSent], delayedDelivery: { maxRetries: 5, // re-publish attempts before error queue (default 5) retryDelay: 1000, // ms between retries (default 1000) }, }) .build(); ``` #### Delayed delivery types ```typescript interface DelayedDeliveryEnvelope { originalMessage: any; originalPublication: string; originalOverrides?: PublicationConfig; targetDelay: number; // ms — used as per-message TTL on the wait queue createdAt: number; // Unix timestamp (ms) when the delayed publish was initiated retryCount: number; // starts at 0 } enum DelayedDeliveryErrorCode { INVALID_DELAY = "DELAYED_DELIVERY_INVALID_DELAY", QUEUE_FULL = "DELAYED_DELIVERY_QUEUE_FULL", ERROR_QUEUE_PUBLISH_FAILED = "DELAYED_DELIVERY_ERROR_QUEUE_PUBLISH_FAILED", MAX_RETRIES_EXCEEDED = "DELAYED_DELIVERY_MAX_RETRIES_EXCEEDED", RETRY_ENQUEUE_FAILED = "DELAYED_DELIVERY_RETRY_ENQUEUE_FAILED", } class DelayedDeliveryError extends Error { readonly code: DelayedDeliveryErrorCode; readonly details?: any; constructor(code: DelayedDeliveryErrorCode, message: string, details?: any); } type DelayConfig = true | { default: number }; ``` #### Delayed delivery queue naming | Artifact | Pattern | Example | |----------|---------|---------| | Wait queue | `{domain}_{type}_{snake_name}_wait` | `orders_event_reminder_sent_wait` | | Ready queue | `{domain}_{type}_{snake_name}_ready` | `orders_event_reminder_sent_ready` | | Error queue | `{domain}_{type}_{snake_name}_errors` | `orders_event_reminder_sent_errors` | | Wait publication | `{domain}_{type}_{snake_name}_delayed` | `orders_event_reminder_sent_delayed` | --- ### ServiceBuilder ```typescript class ServiceBuilder { constructor(serviceName: string, config: ServiceConfig); /** Adds middleware to the pipeline. Chainable. */ use(middleware: MiddlewareFunction): ServiceBuilder; /** Builds the broker through all 7 phases. Returns a wired ServiceBroker. */ build(): Promise; } ``` ### Hoppity (entry point) ```typescript interface Hoppity { service(serviceName: string, config: ServiceConfig): ServiceBuilder; } // Default export: const hoppity: Hoppity; export default hoppity; ``` --- ## Function Signatures ### `defineDomain` ```typescript function defineDomain< TDomain extends string, TEvents extends EventsDefinition, TCommands extends CommandsDefinition, TRpc extends RpcDefinition, >( domainName: TDomain, definition: DomainDefinitionInput ): DomainDefinition; ``` - Throws if `domainName` is empty or whitespace-only. - All sections (`events`, `commands`, `rpc`) are optional — omit any that don't apply. - Returns typed contract objects for all declared operations. ### `onEvent` ```typescript function onEvent( contract: EventContract, handler: EventHandler, options?: HandlerOptions ): EventHandlerDeclaration; ``` ### `onCommand` ```typescript function onCommand( contract: CommandContract, handler: CommandHandler, options?: HandlerOptions ): CommandHandlerDeclaration; ``` ### `onRpc` ```typescript function onRpc( contract: RpcContract, handler: RpcHandler, options?: HandlerOptions ): RpcHandlerDeclaration; ``` --- ## Naming Conventions Topology artifact names are derived mechanically. You don't set them — the contracts carry them. | Artifact | Pattern | Example | |----------|---------|---------| | Exchange (event/command) | `{domain}` | `orders` | | Exchange (rpc) | `{domain}_rpc` | `orders_rpc` | | Routing key (event) | `{domain}.event.{snake_name}` | `orders.event.order_created` | | Routing key (command) | `{domain}.command.{snake_name}` | `orders.command.cancel_order` | | Routing key (rpc) | `{domain}.rpc.{snake_name}` | `orders.rpc.create_order` | | Queue | `{service}_{domain}_{type}_{snake_name}` | `catalog-service_orders_event_order_created` | | Publication name | `{domain}_{type}_{snake_name}` | `orders_event_order_created` | | Subscription name | `{domain}_{type}_{snake_name}` | `orders_event_order_created` | | Reply queue | `{service}_{instanceId}_reply` | `order-service_abc123_reply` | camelCase operation names are converted to snake_case. Acronyms collapse to a single segment: `getHTTPResponse` → `get_http_response`. --- ## Complete Usage Example ```typescript import { z } from "zod"; import hoppity, { defineDomain, onEvent, onCommand, onRpc } from "@apogeelabs/hoppity"; // --- Define the domain --- const LineItemSchema = z.object({ productId: z.string(), quantity: z.number(), price: z.number(), }); const OrdersDomain = defineDomain("orders", { events: { orderCreated: z.object({ orderId: z.string(), items: z.array(LineItemSchema), }), orderCancelled: z.object({ orderId: z.string(), reason: z.string(), }), }, commands: { cancelOrder: z.object({ orderId: z.string() }), }, rpc: { createOrder: { request: z.object({ items: z.array(LineItemSchema) }), response: z.object({ orderId: z.string(), total: z.number() }), }, getOrderSummary: { request: z.object({ orderId: z.string() }), response: z.object({ orderId: z.string(), status: z.string(), total: z.number() }), }, }, }); // --- Define handlers --- const createOrderHandler = onRpc( OrdersDomain.rpc.createOrder, async (request, { broker }) => { const order = await orderStore.create(request); await broker.publishEvent(OrdersDomain.events.orderCreated, { orderId: order.id, items: order.items, }); return { orderId: order.id, total: order.total }; } ); const cancelOrderHandler = onCommand( OrdersDomain.commands.cancelOrder, async ({ orderId }, { broker }) => { const order = await orderStore.cancel(orderId); await broker.publishEvent(OrdersDomain.events.orderCancelled, { orderId: order.id, reason: "Requested by customer", }); }, { queueType: "quorum", redeliveries: { limit: 10 }, deadLetter: { exchange: "order-service-dlx" }, } ); // --- Build the service --- const broker = await hoppity .service("order-service", { connection: { url: process.env.RABBITMQ_URL ?? "amqp://localhost", vhost: "/", options: { heartbeat: 10 }, retry: { factor: 2, min: 1000, max: 5000 }, }, handlers: [ createOrderHandler, cancelOrderHandler, ], publishes: [ OrdersDomain.events.orderCreated, OrdersDomain.events.orderCancelled, ], logger, }) .build(); // --- Use the broker --- await broker.publishEvent(OrdersDomain.events.orderCreated, { orderId: "ord-123", items: [{ productId: "prod-1", quantity: 2, price: 19.99 }], }); const order = await broker.request(OrdersDomain.rpc.createOrder, { items: [{ productId: "prod-1", quantity: 1, price: 9.99 }], }); // order is typed as { orderId: string; total: number } const cancelled = broker.cancelRequest(correlationId); // boolean await broker.shutdown(); ``` --- ## RPC-Only Caller Example A service that calls RPC but doesn't handle any RPC operations itself: ```typescript const broker = await hoppity .service("gateway-service", { connection: { url: "amqp://localhost" }, publishes: [ OrdersDomain.rpc.createOrder, // declares outbound RPC call OrdersDomain.rpc.getOrderSummary, ], // no handlers — this service only calls, never responds }) .build(); const result = await broker.request(OrdersDomain.rpc.createOrder, { items }); ``` --- ## Escape Hatch: Raw Topology Only For services not using contracts, or for one-off infrastructure that can't be derived: ```typescript const broker = await hoppity .service("legacy-service", { connection: { url: "amqp://localhost" }, topology: existingRascalConfig, // raw BrokerConfig, merged as base // handlers and publishes default to [] — no topology derivation }) .build(); ``` You can also combine raw topology with derived topology. The raw config is the base; derived topology layers on top: ```typescript const broker = await hoppity .service("order-service", { connection: { url: "amqp://localhost" }, handlers: [cancelOrderHandler], publishes: [OrdersDomain.events.orderCancelled], topology: { // DLX exchange not derived automatically — add it manually vhosts: { "/": { exchanges: { "order-service-dlx": { type: "topic" } }, }, }, }, }) .build(); ``` --- ## Middleware API Middleware functions see the complete derived + merged topology before the broker is created. They can modify it further and optionally register a post-creation callback. ```typescript const auditMiddleware: MiddlewareFunction = (topology, context) => { context.logger.info(`[audit] Building broker for service: ${context.serviceName}`); context.data.auditEnabled = true; return { topology, // pass through unchanged onBrokerCreated: async (broker) => { // broker is fully wired here — handlers and outbound methods are attached context.logger.info("[audit] Broker is live"); }, }; }; const broker = await hoppity .service("my-service", { connection: { url: "amqp://localhost" }, logger, // active before any middleware runs — no ordering footgun }) .use(auditMiddleware) .build(); --- ## Interceptor API Interceptors wrap handler execution (inbound) and publish calls (outbound) at runtime. They are the extension point for tracing, metrics, and any cross-cutting concern that needs to observe or modify per-message processing. Unlike middleware (which runs once at build time), interceptors run on every message. ### Interceptor Type Definitions ```typescript /** * A unified interceptor that can wrap inbound handler execution, * outbound publish calls, or both. Either direction is optional — * an interceptor with only `inbound` or only `outbound` is valid. * * Interceptors are configuration, not runtime state. They are wired * at build time and cannot be added or removed after the broker is created. */ interface Interceptor { /** Name for logging and debugging — required, must be non-empty */ name: string; /** Wraps handler execution for events, commands, and RPC responders */ inbound?: InboundWrapper; /** Wraps publish calls for publishEvent, sendCommand, and request */ outbound?: OutboundWrapper; } /** * Wraps a handler function. Receives the original handler and per-message * metadata. Returns a replacement handler with the same signature. * * Metadata is built per-message because headers vary per message. */ type InboundWrapper = ( handler: (payload: any, context: HandlerContext) => Promise, metadata: InboundMetadata ) => (payload: any, context: HandlerContext) => Promise; /** * Wraps a publish function. Receives the inner publish and metadata about * the contract being published to. Returns a replacement publish with the * same signature. */ type OutboundWrapper = ( publish: (message: any, overrides?: PublicationConfig) => Promise, metadata: OutboundMetadata ) => (message: any, overrides?: PublicationConfig) => Promise; /** * Metadata available to inbound wrappers when a message is received. */ interface InboundMetadata { /** The contract this handler is bound to */ contract: EventContract | CommandContract | RpcContract; /** Operation kind */ kind: "event" | "command" | "rpc"; /** The service name from hoppity.service() */ serviceName: string; /** AMQP message surface — headers for trace context extraction, properties for message metadata */ message: { headers: Record; properties: Record; }; } /** * Metadata available to outbound wrappers when a message is published. * Constructed per-call because the contract is only known at call time. */ interface OutboundMetadata { /** The contract being published to */ contract: EventContract | CommandContract | RpcContract; /** Operation kind */ kind: "event" | "command" | "rpc"; /** The service name from hoppity.service() */ serviceName: string; } ``` ### Configuration Pass interceptors in `ServiceConfig.interceptors`. They apply to all handlers and all outbound calls for the service. ```typescript const broker = await hoppity .service("order-service", { connection: { url: process.env.RABBITMQ_URL! }, handlers: [createOrderHandler, cancelOrderHandler], publishes: [OrdersDomain.events.orderCreated], interceptors: [withTracing, withMetrics], }) .build(); ``` ### Composition Order For `interceptors: [A, B]`, the call chain is `A → B → handler` (inbound) and `A → B → rascal publish` (outbound). First interceptor declared is outermost — it sees the call first on the way in and last on the way out. Inbound wrappers are composed per-message (metadata includes per-message headers). Outbound wrappers are composed per-publish-call (metadata derives from the contract argument). The per-call overhead is negligible — wrapping 1-3 functions is nanoseconds relative to AMQP I/O. ### One-Directional Interceptors An interceptor that only declares `inbound` or only `outbound` is valid. The framework skips the missing direction. ```typescript // Inbound-only: log handler execution time const withHandlerTiming: Interceptor = { name: "handler-timing", inbound: (handler, meta) => async (payload, ctx) => { const start = performance.now(); try { return await handler(payload, ctx); } finally { console.log(`${meta.contract._name} took ${performance.now() - start}ms`); } }, }; // Outbound-only: inject correlation headers on every publish const withCorrelationHeaders: Interceptor = { name: "correlation-headers", outbound: (publish, meta) => async (message, overrides) => { return publish(message, { ...overrides, options: { ...overrides?.options, headers: { ...overrides?.options?.headers, "x-source-service": meta.serviceName, "x-source-domain": meta.contract._domain, }, }, }); }, }; ``` ### RPC and Interceptors Both directions apply to RPC: - **Inbound (responder):** RPC handlers are wrapped identically to event/command handlers. `meta.kind` is `"rpc"`. The wrapper receives the unwrapped request payload (after `RpcRequest` envelope extraction). - **Outbound (caller):** `broker.request()` is wrapped identically to `publishEvent`/`sendCommand`. `meta.kind` is `"rpc"`. RPC response processing (reply queue subscription, correlation resolution) is not intercepted — that is internal framework plumbing. ### `hoppity-open-telemetry` Package ```typescript import { withTracing, withMetrics } from "@apogeelabs/hoppity-open-telemetry"; import type { TracingOptions, MetricsOptions } from "@apogeelabs/hoppity-open-telemetry"; ``` Both `withTracing` and `withMetrics` are dual-use: they can be placed in the `interceptors` array directly (uses defaults) or called as a factory to supply options. ```typescript // Direct use — default tracer/meter name "hoppity" interceptors: [withTracing, withMetrics] // Factory use — custom names and options interceptors: [ withTracing({ tracerName: "order-service", spanPrefix: "msg" }), withMetrics({ meterName: "order-service", histogramBuckets: [5, 10, 25, 50, 100, 250] }), ] ``` #### `TracingOptions` ```typescript interface TracingOptions { /** * Name passed to opentelemetry.trace.getTracer(). * Defaults to "hoppity". */ tracerName?: string; /** * Prefix prepended to span names, separated by a colon. * Defaults to the operation kind ("event", "command", "rpc", or "publish"). * Supply this to namespace spans when multiple services share a tracer. */ spanPrefix?: string; } ``` #### `MetricsOptions` ```typescript interface MetricsOptions { /** * Name passed to opentelemetry.metrics.getMeter(). * Defaults to "hoppity". */ meterName?: string; /** * Explicit histogram bucket boundaries for duration histograms (milliseconds). * Omit to use the OTel SDK default boundaries. */ histogramBuckets?: number[]; } ``` #### `withTracing` behaviour - **Inbound:** extracts parent trace context from `meta.message.headers` via `propagation.extract()`. Starts an active span named `{spanPrefix}:{domain}.{operationName}` where `spanPrefix` defaults to `meta.kind` (e.g. `"event"`, `"command"`, `"rpc"`). Sets semantic attributes (see table below). Records exceptions and sets `SpanStatusCode.ERROR` on failure; sets `SpanStatusCode.OK` on success. - **Outbound:** starts an active span named `{spanPrefix}:{domain}.{operationName}` where `spanPrefix` defaults to `"publish"`. Injects the current trace context into AMQP message headers via `propagation.inject()` so downstream consumers can link their spans as children. Records exceptions on failure. Span name examples with defaults: - `"event:orders.orderCreated"` - `"command:orders.cancelOrder"` - `"rpc:orders.createOrder"` - `"publish:orders.orderCreated"` #### `withMetrics` behaviour - **Inbound:** increments `hoppity.handler.count`, records `hoppity.handler.duration` histogram (ms) in `finally`, increments `hoppity.handler.errors` on failure. - **Outbound:** increments `hoppity.publish.count`, records `hoppity.publish.duration` histogram (ms) in `finally`, increments `hoppity.publish.errors` on failure. OTel meter instruments are initialised lazily on first message — the SDK and exporter need not be configured at import time. #### Attributes Both interceptors attach the same attribute set, derived from `InboundMetadata` / `OutboundMetadata`: | Attribute | Value | Notes | |-----------|-------|-------| | `messaging.system` | `"rabbitmq"` | OTel semantic convention | | `messaging.operation.type` | `"receive"` (inbound) / `"publish"` (outbound) | OTel semantic convention | | `messaging.destination.name` | `contract.exchange` | The RabbitMQ exchange name | | `hoppity.domain` | `contract._domain` | e.g. `"orders"` | | `hoppity.operation` | `contract._name` | e.g. `"orderCreated"` | | `hoppity.kind` | `meta.kind` | `"event"`, `"command"`, or `"rpc"` | | `service.name` | `meta.serviceName` | The name passed to `hoppity.service()` | Requires `@opentelemetry/api@^1.9.0` as a peer dependency. The caller provides the SDK, exporter, and SDK configuration — this package only calls the OTel API surface. --- ## Middleware vs Interceptors These are different mechanisms. Do not conflate them. | | Middleware (`.use()`) | Interceptors (`ServiceConfig.interceptors`) | |---|---|---| | When | Before broker creation (build time) | During message processing (runtime) | | What | Modifies topology, lifecycle hooks | Wraps handler/publish execution | | Scope | Service-level, once | Per-message | | Examples | Custom logger, topology augmentation (DLX exchanges, shovel config) | Tracing, metrics, header injection | | API | `(topology, context) => MiddlewareResult` | `(handler/publish, metadata) => wrappedHandler/publish` | A concern belongs in middleware if it modifies the broker's topology or needs to run a lifecycle hook after the broker is created. It belongs in an interceptor if it needs to observe or decorate individual message operations at runtime. --- ## Build Phases (what happens inside `.build()`) 1. **Derive topology** — `deriveTopology(serviceName, handlers, publishes, connection, instanceId)` generates Rascal `BrokerConfig` from all handler and publish declarations. 2. **Merge topology** — `mergeTopology(rawTopology, derived)` merges the optional raw `topology` from `ServiceConfig` (as base) with the derived config. Derived config wins on conflicts. 3. **Middleware pipeline** — Each middleware runs in order, seeing the complete topology. Can modify topology and register `onBrokerCreated` callbacks. 4. **Create broker** — `BrokerAsPromised.create(finalTopology)`. 5. **Wire handlers** — Subscribe event, command, and RPC handlers to their queues. Inbound interceptors are composed around each handler before subscription. Auto-ack on success. Nack without requeue on error. 6. **Wire outbound** — Attach `publishEvent`, `sendCommand` to the broker. Outbound interceptors are composed per-call inside each publish method. If any RPC handlers or RPC callers exist, set up the reply queue subscription and attach `request`, `cancelRequest`. Wrap `broker.shutdown()` to drain pending RPC requests. 7. **Middleware callbacks** — Each `onBrokerCreated` callback runs against the fully-wired broker. Fail-fast: if any callback throws, broker is shut down before the error propagates. --- ## Error Handling ### Build failures ```typescript // Error message includes service name, middleware execution count, and original error: // "Service 'order-service' broker creation failed. Pipeline executed 2 middleware(s). Original error: ..." // error.cause holds the original error ``` ### Middleware failures ```typescript // Error message includes middleware index and name: // "Middleware 2 (myMiddleware) failed: ..." // error.cause holds the original error ``` ### RPC errors ```typescript import { RpcError, RpcErrorCode } from "@apogeelabs/hoppity"; try { const result = await broker.request(OrdersDomain.rpc.createOrder, { items }); } catch (err) { if (err instanceof RpcError) { switch (err.code) { case RpcErrorCode.HANDLER_ERROR: // The remote handler threw break; case RpcErrorCode.TIMEOUT: // Request timed out (defaultTimeout or ServiceConfig.defaultTimeout) break; case RpcErrorCode.CANCELLED: // broker.cancelRequest(correlationId) was called break; } } } ``` ### Handler errors (inbound) - Event/command handlers: nack without requeue on error — message goes to dead-letter exchange if configured, otherwise dropped. - RPC handlers: error response published back to caller with `RpcErrorCode.HANDLER_ERROR`. - Inbound Zod validation failures (when `validateInbound: true`): nack without requeue + error logged. --- ## ConsoleLogger Default logger used when no custom logger is provided. ```typescript class ConsoleLogger implements Logger { silly(message: string, ...args: any[]): void; // → console.log debug(message: string, ...args: any[]): void; // → console.log info(message: string, ...args: any[]): void; // → console.log warn(message: string, ...args: any[]): void; // → console.warn error(message: string, ...args: any[]): void; // → console.error critical(message: string, ...args: any[]): void; // → console.error } const defaultLogger: ConsoleLogger; // singleton instance ``` --- ## What Was Removed in v1 These packages and APIs no longer exist: | Removed | Replacement | |---------|-------------| | `@apogeelabs/hoppity-contracts` | `defineDomain` is now in `@apogeelabs/hoppity` | | `@apogeelabs/hoppity-operations` | `onEvent`, `onCommand`, `onRpc` are now in `@apogeelabs/hoppity` | | `@apogeelabs/hoppity-rpc` | RPC is built into core — `broker.request()` / `broker.cancelRequest()` | | `@apogeelabs/hoppity-subscriptions` | Handler wiring is built-in — no `withSubscriptions()` needed | | `hoppity.withTopology()` | Use `hoppity.service(name, { topology: rawConfig })` | | `RascalBuilder` | Use `ServiceBuilder` (returned by `hoppity.service()`) | | `withOperations()` | Pass handlers directly in `ServiceConfig.handlers` | | `withOutboundExchange()` | Cut from v1 — add manually via raw `topology` if needed | | `buildServiceTopology()` | Topology is derived automatically — no manual call needed | | Manual `topology.ts` files | Delete them — handlers + publishes replace them entirely | | `as OperationsBroker` casts | `ServiceBroker` is the return type of `.build()` directly |