arazzo: 1.0.1 info: title: Google Cloud Dataflow Snapshot Streaming Job summary: Confirm a streaming job is running, take a snapshot, then read the snapshot back. description: >- Captures the state of a running streaming Dataflow job into a snapshot that can later seed a new job. The workflow reads the job to confirm it is running, creates a snapshot with the supplied retention window, captures the new snapshot id, then reads the snapshot back to confirm its state and metadata. Every step spells out its request inline, including the inline Bearer authorization Google Cloud requires, so the flow can be read and executed without opening the underlying OpenAPI description. version: 1.0.0 sourceDescriptions: - name: dataflowApi url: ../openapi/google-cloud-dataflow-api-openapi.yml type: openapi workflows: - workflowId: snapshot-streaming-job summary: Confirm a streaming job is running, snapshot it, and read the snapshot. description: >- Reads a job to confirm it is running, creates a snapshot of the streaming pipeline, then retrieves the created snapshot to confirm its state. inputs: type: object required: - accessToken - projectId - location - jobId properties: accessToken: type: string description: Google Cloud OAuth 2.0 access token used as a Bearer credential. projectId: type: string description: The Google Cloud project id that owns the job. location: type: string description: The regional endpoint that contains the job (e.g. us-central1). jobId: type: string description: The id of the running streaming job to snapshot. ttl: type: string description: Snapshot retention duration as a duration string (e.g. 604800s). description: type: string description: A human-readable description for the snapshot. steps: - stepId: confirmRunning description: >- Read the job and confirm it is currently in the JOB_STATE_RUNNING state before creating a snapshot. operationId: getLocationJob parameters: - name: Authorization in: header value: Bearer $inputs.accessToken - name: projectId in: path value: $inputs.projectId - name: location in: path value: $inputs.location - name: jobId in: path value: $inputs.jobId successCriteria: - condition: $statusCode == 200 - context: $response.body condition: $.currentState == "JOB_STATE_RUNNING" type: jsonpath outputs: currentState: $response.body#/currentState - stepId: createSnapshot description: >- Create a snapshot of the running streaming job, capturing its state for later reuse with the supplied retention window. operationId: snapshotLocationJob parameters: - name: Authorization in: header value: Bearer $inputs.accessToken - name: projectId in: path value: $inputs.projectId - name: location in: path value: $inputs.location - name: jobId in: path value: $inputs.jobId requestBody: contentType: application/json payload: ttl: $inputs.ttl location: $inputs.location description: $inputs.description successCriteria: - condition: $statusCode == 200 outputs: snapshotId: $response.body#/id snapshotState: $response.body#/state - stepId: confirmSnapshot description: >- Read the newly created snapshot back to confirm its state, source job, and creation metadata. operationId: getLocationSnapshot parameters: - name: Authorization in: header value: Bearer $inputs.accessToken - name: projectId in: path value: $inputs.projectId - name: location in: path value: $inputs.location - name: snapshotId in: path value: $steps.createSnapshot.outputs.snapshotId successCriteria: - condition: $statusCode == 200 outputs: snapshotId: $response.body#/id state: $response.body#/state sourceJobId: $response.body#/sourceJobId creationTime: $response.body#/creationTime outputs: snapshotId: $steps.confirmSnapshot.outputs.snapshotId state: $steps.confirmSnapshot.outputs.state sourceJobId: $steps.confirmSnapshot.outputs.sourceJobId