arazzo: 1.0.1 info: title: Snowflake Create, Verify, and Refresh a Pipe summary: Create a pipe with a COPY statement, describe it, then trigger a refresh to ingest files. description: >- Snowpipe ingestion flow. The workflow creates a pipe backed by a COPY INTO statement, describes the pipe to confirm it was created and read back its definition, and then triggers a refresh that queues staged files matching the pipe for ingestion. Each step inlines its Authorization bearer token and the X-Snowflake-Authorization-Token-Type header, its query parameters, and its JSON request body where applicable so the chain can be read and executed without opening the underlying OpenAPI description. version: 1.0.0 sourceDescriptions: - name: pipeApi url: ../openapi/pipe.yaml type: openapi workflows: - workflowId: create-and-refresh-pipe summary: Create a pipe, fetch it to confirm, then refresh it to queue staged files. description: >- Chains createPipe, fetchPipe, and refreshPipe so a Snowpipe is provisioned with a COPY statement, verified, and then refreshed, all scoped to the same database and schema. inputs: type: object required: - authToken - databaseName - schemaName - pipeName - copyStatement properties: authToken: type: string description: Bearer token (KEYPAIR_JWT, OAUTH, or programmatic access token). tokenType: type: string description: Value for the X-Snowflake-Authorization-Token-Type header. default: OAUTH databaseName: type: string description: Database that holds the schema and pipe. schemaName: type: string description: Schema that holds the pipe. pipeName: type: string description: Name of the pipe to create. copyStatement: type: string description: The COPY INTO statement that defines the pipe. comment: type: string description: Optional comment applied to the pipe. steps: - stepId: createPipe description: Create the pipe with its COPY INTO definition. operationId: createPipe parameters: - name: database in: path value: $inputs.databaseName - name: schema in: path value: $inputs.schemaName - name: createMode in: query value: errorIfExists - name: Authorization in: header value: Bearer $inputs.authToken - name: X-Snowflake-Authorization-Token-Type in: header value: $inputs.tokenType requestBody: contentType: application/json payload: name: $inputs.pipeName copy_statement: $inputs.copyStatement comment: $inputs.comment successCriteria: - condition: $statusCode == 200 outputs: status: $response.body#/status - stepId: fetchPipe description: Describe the pipe to confirm it was created. operationId: fetchPipe parameters: - name: database in: path value: $inputs.databaseName - name: schema in: path value: $inputs.schemaName - name: name in: path value: $inputs.pipeName - name: Authorization in: header value: Bearer $inputs.authToken - name: X-Snowflake-Authorization-Token-Type in: header value: $inputs.tokenType successCriteria: - condition: $statusCode == 200 outputs: pipeName: $response.body#/name - stepId: refreshPipe description: Refresh the pipe to queue staged files for ingestion. operationId: refreshPipe parameters: - name: database in: path value: $inputs.databaseName - name: schema in: path value: $inputs.schemaName - name: name in: path value: $inputs.pipeName - name: Authorization in: header value: Bearer $inputs.authToken - name: X-Snowflake-Authorization-Token-Type in: header value: $inputs.tokenType successCriteria: - condition: $statusCode == 200 outputs: status: $response.body#/status outputs: createStatus: $steps.createPipe.outputs.status confirmedPipe: $steps.fetchPipe.outputs.pipeName refreshStatus: $steps.refreshPipe.outputs.status