arazzo: 1.0.1 info: title: Amazon Kinesis Put Records and Read Back summary: Batch-write records, get a shard iterator, then read the records back. description: >- Demonstrates the core producer-to-consumer round trip in Kinesis Data Streams. The workflow writes a batch of records with PutRecords, captures the shard id the first record landed on, obtains a TRIM_HORIZON shard iterator for that shard with GetShardIterator, and then reads the records back with GetRecords. Each step inlines its AWS JSON protocol request, including the required X-Amz-Target header, so the round trip can be read and executed without opening the underlying OpenAPI description. version: 1.0.0 sourceDescriptions: - name: kinesisDataStreamsApi url: ../openapi/amazon-kinesis-data-streams-openapi.yml type: openapi workflows: - workflowId: put-records-and-read-back summary: Batch-put records, then read them back from the landing shard. description: >- Writes a batch of records with PutRecords, derives the shard from the first result entry, gets a TRIM_HORIZON iterator on that shard, and reads the records back. inputs: type: object required: - streamName - records properties: streamName: type: string description: The name of the Kinesis data stream to write to and read from. records: type: array description: >- The records to write, each an object with Data (Base64) and PartitionKey. items: type: object required: - Data - PartitionKey properties: Data: type: string description: The Base64-encoded data blob. PartitionKey: type: string description: The partition key for the record. steps: - stepId: putRecords description: >- Write the batch of records in a single call and capture the shard id of the first result entry to read from. operationId: PutRecords parameters: - name: X-Amz-Target in: header value: Kinesis_20131202.PutRecords requestBody: contentType: application/x-amz-json-1.1 payload: StreamName: $inputs.streamName Records: $inputs.records successCriteria: - condition: $statusCode == 200 outputs: failedRecordCount: $response.body#/FailedRecordCount firstShardId: $response.body#/Records/0/ShardId firstSequenceNumber: $response.body#/Records/0/SequenceNumber - stepId: getShardIterator description: >- Obtain a TRIM_HORIZON shard iterator for the shard the first record landed on, positioning the reader at the oldest record in the shard. operationId: GetShardIterator parameters: - name: X-Amz-Target in: header value: Kinesis_20131202.GetShardIterator requestBody: contentType: application/x-amz-json-1.1 payload: StreamName: $inputs.streamName ShardId: $steps.putRecords.outputs.firstShardId ShardIteratorType: TRIM_HORIZON successCriteria: - condition: $statusCode == 200 outputs: shardIterator: $response.body#/ShardIterator - stepId: getRecords description: >- Read the records back from the shard using the iterator, returning the retrieved records and the next iterator for continued reading. operationId: GetRecords parameters: - name: X-Amz-Target in: header value: Kinesis_20131202.GetRecords requestBody: contentType: application/x-amz-json-1.1 payload: ShardIterator: $steps.getShardIterator.outputs.shardIterator successCriteria: - condition: $statusCode == 200 outputs: records: $response.body#/Records nextShardIterator: $response.body#/NextShardIterator millisBehindLatest: $response.body#/MillisBehindLatest outputs: failedRecordCount: $steps.putRecords.outputs.failedRecordCount records: $steps.getRecords.outputs.records nextShardIterator: $steps.getRecords.outputs.nextShardIterator