arazzo: 1.0.1 info: title: Amazon Neptune Property Graph Stream Replay summary: Read property graph change records from the start of the stream, then continue from the last event id. description: >- Consumes the Neptune property graph change stream in two chained reads. The first read starts at TRIM_HORIZON to fetch the earliest available change-log records and captures the last event id (commit and operation numbers). The second read continues immediately after that position using AFTER_SEQUENCE_NUMBER with the captured commit and op numbers, demonstrating cursor-based stream pagination. Every step spells out its request inline so the flow can be read and executed without opening the underlying OpenAPI description. version: 1.0.0 sourceDescriptions: - name: neptuneStreamsApi url: ../openapi/amazon-neptune-streams-openapi.yml type: openapi workflows: - workflowId: propertygraph-stream-replay summary: Read the earliest property graph stream records, then page forward from the cursor. description: >- Reads property graph change records from TRIM_HORIZON, then continues after the returned last event id. inputs: type: object properties: limit: type: integer description: Maximum number of records to return per read (1-100000). steps: - stepId: readFromStart description: >- Read the earliest available property graph change-log records starting at TRIM_HORIZON and capture the last event id as a cursor. operationId: getPropertyGraphStream parameters: - name: iteratorType in: query value: TRIM_HORIZON - name: limit in: query value: $inputs.limit successCriteria: - condition: $statusCode == 200 outputs: records: $response.body#/records lastCommitNum: $response.body#/lastEventId/commitNum lastOpNum: $response.body#/lastEventId/opNum totalRecords: $response.body#/totalRecords - stepId: continueFromCursor description: >- Continue reading the stream immediately after the captured position using AFTER_SEQUENCE_NUMBER with the prior commit and operation numbers. operationId: getPropertyGraphStream parameters: - name: iteratorType in: query value: AFTER_SEQUENCE_NUMBER - name: commitNum in: query value: $steps.readFromStart.outputs.lastCommitNum - name: opNum in: query value: $steps.readFromStart.outputs.lastOpNum - name: limit in: query value: $inputs.limit successCriteria: - condition: $statusCode == 200 outputs: records: $response.body#/records lastCommitNum: $response.body#/lastEventId/commitNum totalRecords: $response.body#/totalRecords outputs: firstBatchRecords: $steps.readFromStart.outputs.records nextBatchRecords: $steps.continueFromCursor.outputs.records cursorCommitNum: $steps.continueFromCursor.outputs.lastCommitNum