arazzo: 1.0.1 info: title: Google Cloud Dataflow Drain Running Job summary: Confirm a streaming job is running, request a drain, then poll until it is drained. description: >- Gracefully stops a running streaming Dataflow job by draining it, which stops ingesting new data while finishing in-flight work. The workflow reads the job to confirm it is running, issues an update that sets the requested state to JOB_STATE_DRAINING, then polls the job until it reports the terminal JOB_STATE_DRAINED state. Every step spells out its request inline, including the inline Bearer authorization Google Cloud requires, so the flow can be read and executed without opening the underlying OpenAPI description. version: 1.0.0 sourceDescriptions: - name: dataflowApi url: ../openapi/google-cloud-dataflow-api-openapi.yml type: openapi workflows: - workflowId: drain-running-job summary: Confirm running, request drain, and poll until JOB_STATE_DRAINED. description: >- Reads a job to confirm it is running, updates its requested state to JOB_STATE_DRAINING, then polls until it reaches JOB_STATE_DRAINED. inputs: type: object required: - accessToken - projectId - location - jobId properties: accessToken: type: string description: Google Cloud OAuth 2.0 access token used as a Bearer credential. projectId: type: string description: The Google Cloud project id that owns the job. location: type: string description: The regional endpoint that contains the job (e.g. us-central1). jobId: type: string description: The id of the running streaming job to drain. steps: - stepId: confirmRunning description: >- Read the job and confirm it is currently in the JOB_STATE_RUNNING state before requesting a drain. operationId: getLocationJob parameters: - name: Authorization in: header value: Bearer $inputs.accessToken - name: projectId in: path value: $inputs.projectId - name: location in: path value: $inputs.location - name: jobId in: path value: $inputs.jobId successCriteria: - condition: $statusCode == 200 - context: $response.body condition: $.currentState == "JOB_STATE_RUNNING" type: jsonpath outputs: currentState: $response.body#/currentState - stepId: requestDrain description: >- Update the job's requested state to JOB_STATE_DRAINING so it stops ingesting new data while finishing in-flight work. operationId: updateLocationJob parameters: - name: Authorization in: header value: Bearer $inputs.accessToken - name: projectId in: path value: $inputs.projectId - name: location in: path value: $inputs.location - name: jobId in: path value: $inputs.jobId - name: updateMask in: query value: requestedState requestBody: contentType: application/json payload: requestedState: JOB_STATE_DRAINING successCriteria: - condition: $statusCode == 200 outputs: requestedState: $response.body#/requestedState - stepId: pollUntilDrained description: >- Poll the job until it reports the terminal JOB_STATE_DRAINED state, retrying while it is still draining. operationId: getLocationJob parameters: - name: Authorization in: header value: Bearer $inputs.accessToken - name: projectId in: path value: $inputs.projectId - name: location in: path value: $inputs.location - name: jobId in: path value: $inputs.jobId successCriteria: - condition: $statusCode == 200 - context: $response.body condition: $.currentState == "JOB_STATE_DRAINED" type: jsonpath outputs: currentState: $response.body#/currentState onFailure: - name: keepPolling type: retry retryAfter: 30 retryLimit: 60 criteria: - condition: $statusCode == 200 outputs: jobId: $inputs.jobId finalState: $steps.pollUntilDrained.outputs.currentState