arazzo: 1.0.1 info: title: Google Cloud Dataflow Launch Classic Template and Monitor summary: Launch a job from a classic Dataflow template, poll it to completion, then read its metrics. description: >- Launches a Dataflow job from a classic template stored in Cloud Storage in a specific region, then watches the resulting job until it reaches a terminal state. The workflow launches the template, captures the new job id, polls the job state until it reports JOB_STATE_DONE, and finally pulls the execution metrics for the completed job. Every step spells out its request inline, including the inline Bearer authorization Google Cloud requires, so the flow can be read and executed without opening the underlying OpenAPI description. version: 1.0.0 sourceDescriptions: - name: dataflowApi url: ../openapi/google-cloud-dataflow-api-openapi.yml type: openapi workflows: - workflowId: launch-classic-template-and-monitor summary: Launch a classic template job, poll until done, and read its metrics. description: >- Launches a classic Dataflow template into a region, captures the created job id, polls the job state until JOB_STATE_DONE, then retrieves the job metrics. inputs: type: object required: - accessToken - projectId - location - gcsPath - jobName properties: accessToken: type: string description: Google Cloud OAuth 2.0 access token used as a Bearer credential. projectId: type: string description: The Google Cloud project id that owns the job. location: type: string description: The regional endpoint that contains the job (e.g. us-central1). gcsPath: type: string description: Cloud Storage path to the classic template (gs://...). jobName: type: string description: The name to assign to the launched job. parameters: type: object description: Map of runtime template parameter name/value pairs. steps: - stepId: launchTemplate description: >- Launch a Dataflow job from the classic template at the supplied Cloud Storage path in the requested region. operationId: launchLocationTemplate parameters: - name: Authorization in: header value: Bearer $inputs.accessToken - name: projectId in: path value: $inputs.projectId - name: location in: path value: $inputs.location - name: gcsPath in: query value: $inputs.gcsPath requestBody: contentType: application/json payload: jobName: $inputs.jobName parameters: $inputs.parameters successCriteria: - condition: $statusCode == 200 outputs: jobId: $response.body#/job/id jobState: $response.body#/job/currentState - stepId: pollJobState description: >- Fetch the launched job and confirm it has reached the terminal JOB_STATE_DONE state before reading metrics. operationId: getLocationJob parameters: - name: Authorization in: header value: Bearer $inputs.accessToken - name: projectId in: path value: $inputs.projectId - name: location in: path value: $inputs.location - name: jobId in: path value: $steps.launchTemplate.outputs.jobId successCriteria: - condition: $statusCode == 200 - context: $response.body condition: $.currentState == "JOB_STATE_DONE" type: jsonpath outputs: currentState: $response.body#/currentState onSuccess: - name: completed type: goto stepId: getMetrics criteria: - context: $response.body condition: $.currentState == "JOB_STATE_DONE" type: jsonpath onFailure: - name: keepPolling type: retry retryAfter: 30 retryLimit: 60 criteria: - condition: $statusCode == 200 - stepId: getMetrics description: >- Retrieve the execution metrics for the completed job, including counters and distributions emitted by the pipeline. operationId: getLocationJobMetrics parameters: - name: Authorization in: header value: Bearer $inputs.accessToken - name: projectId in: path value: $inputs.projectId - name: location in: path value: $inputs.location - name: jobId in: path value: $steps.launchTemplate.outputs.jobId successCriteria: - condition: $statusCode == 200 outputs: metricTime: $response.body#/metricTime metrics: $response.body#/metrics outputs: jobId: $steps.launchTemplate.outputs.jobId finalState: $steps.pollJobState.outputs.currentState metrics: $steps.getMetrics.outputs.metrics