arazzo: 1.0.1 info: title: Prisma Pulse Create and Resume a Named Stream summary: Create a resumable named event stream, read its cursor position, and fetch the last persisted event by ULID. description: >- Sets up a resumable Prisma Pulse named event stream and inspects its resumability state. The workflow creates a named stream for a model, lists the active streams to read the stream's saved cursor (the ULID of the last processed event), and branches: when a cursor exists it fetches that specific persisted event by ID to confirm the resume point. The live consumeStreamEvents endpoint is a long-lived server-sent-events connection and is not directly executable as a single request, so this flow resolves the resume point via the cursor and the getEvent lookup instead. Every step spells out its request inline so the flow can be read and executed without opening the underlying OpenAPI description. version: 1.0.0 sourceDescriptions: - name: pulseApi url: ../openapi/prisma-pulse-openapi.yml type: openapi workflows: - workflowId: create-and-resume-named-stream summary: Create a named stream, read its cursor, and fetch the last persisted event by ULID. description: >- Creates a resumable named stream for a model, lists streams to read the saved cursor position, and fetches the cursor event when one is present. inputs: type: object required: - apiKey - streamName - model properties: apiKey: type: string description: Pulse API key, sent as a Bearer token in the Authorization header. streamName: type: string description: Unique name for the resumable stream (e.g. all-user-events). model: type: string description: The Prisma model name to stream change events for (e.g. User). steps: - stepId: createStream description: >- Create a resumable named event stream for the supplied model. Requires event persistence to be enabled on the environment. operationId: createStream parameters: - name: Authorization in: header value: Bearer $inputs.apiKey requestBody: contentType: application/json payload: name: $inputs.streamName model: $inputs.model successCriteria: - condition: $statusCode == 201 outputs: streamName: $response.body#/name status: $response.body#/status cursor: $response.body#/cursor - stepId: readStreamCursor description: >- List the active streams and read the cursor position for the newly created stream, branching on whether a saved cursor event exists to resume from. operationId: listStreams parameters: - name: Authorization in: header value: Bearer $inputs.apiKey successCriteria: - condition: $statusCode == 200 outputs: cursor: $response.body#/data/0/cursor onSuccess: - name: cursorPresent type: goto stepId: fetchCursorEvent criteria: - context: $response.body condition: $.data[0].cursor != null type: jsonpath - name: noCursor type: end criteria: - context: $response.body condition: $.data[0].cursor == null type: jsonpath - stepId: fetchCursorEvent description: >- Fetch the specific persisted change event identified by the stream cursor ULID to confirm the resume point. operationId: getEvent parameters: - name: eventId in: path value: $steps.readStreamCursor.outputs.cursor - name: Authorization in: header value: Bearer $inputs.apiKey successCriteria: - condition: $statusCode == 200 outputs: eventId: $response.body#/id action: $response.body#/action outputs: streamName: $steps.createStream.outputs.streamName cursor: $steps.readStreamCursor.outputs.cursor resumeEventId: $steps.fetchCursorEvent.outputs.eventId