arazzo: 1.0.1 info: title: Snowflake Create a Table and a Change Stream On It summary: Create a table, create a stream that tracks changes to it, then describe the stream. description: >- Change-data-capture provisioning flow. The workflow creates a source table, creates a stream that tracks DML changes to that table, and describes the stream to confirm it was created and read back its source. Each step inlines its Authorization bearer token and the X-Snowflake-Authorization-Token-Type header, its create-mode query parameter, and its JSON request body where applicable so the chain can be read and executed without opening the underlying OpenAPI descriptions. Because the table and stream live in separate OpenAPI documents, chaining flows through workflow inputs and step outputs. version: 1.0.0 sourceDescriptions: - name: tableApi url: ../openapi/table.yaml type: openapi - name: streamApi url: ../openapi/stream.yaml type: openapi workflows: - workflowId: create-stream-on-table summary: Create a table, create a stream tracking it, then fetch the stream to confirm. description: >- Chains createTable, createStream, and fetchStream so a source table is provisioned, a change stream is created against it, and the stream is verified, all scoped to the same database and schema. inputs: type: object required: - authToken - databaseName - schemaName - tableName - columns - streamName properties: authToken: type: string description: Bearer token (KEYPAIR_JWT, OAUTH, or programmatic access token). tokenType: type: string description: Value for the X-Snowflake-Authorization-Token-Type header. default: OAUTH databaseName: type: string description: Database that holds the schema, table, and stream. schemaName: type: string description: Schema that holds the table and stream. tableName: type: string description: Name of the source table to create and track. columns: type: array description: Column definitions for the source table (name and datatype). items: type: object streamName: type: string description: Name of the stream to create on the table. comment: type: string description: Optional comment applied to the stream. steps: - stepId: createTable description: Create the source table that the stream will track. operationId: createTable parameters: - name: database in: path value: $inputs.databaseName - name: schema in: path value: $inputs.schemaName - name: createMode in: query value: errorIfExists - name: Authorization in: header value: Bearer $inputs.authToken - name: X-Snowflake-Authorization-Token-Type in: header value: $inputs.tokenType requestBody: contentType: application/json payload: name: $inputs.tableName columns: $inputs.columns change_tracking: true successCriteria: - condition: $statusCode == 200 outputs: status: $response.body#/status - stepId: createStream description: Create the stream that tracks changes to the source table. operationId: createStream parameters: - name: database in: path value: $inputs.databaseName - name: schema in: path value: $inputs.schemaName - name: createMode in: query value: errorIfExists - name: Authorization in: header value: Bearer $inputs.authToken - name: X-Snowflake-Authorization-Token-Type in: header value: $inputs.tokenType requestBody: contentType: application/json payload: name: $inputs.streamName table_name: $inputs.tableName comment: $inputs.comment successCriteria: - condition: $statusCode == 200 outputs: status: $response.body#/status - stepId: fetchStream description: Describe the stream to confirm it was created and read back its source. operationId: fetchStream parameters: - name: database in: path value: $inputs.databaseName - name: schema in: path value: $inputs.schemaName - name: name in: path value: $inputs.streamName - name: Authorization in: header value: Bearer $inputs.authToken - name: X-Snowflake-Authorization-Token-Type in: header value: $inputs.tokenType successCriteria: - condition: $statusCode == 200 outputs: streamName: $response.body#/name sourceTable: $response.body#/table_name outputs: tableStatus: $steps.createTable.outputs.status streamStatus: $steps.createStream.outputs.status confirmedStream: $steps.fetchStream.outputs.streamName