arazzo: 1.0.1 info: title: Google Cloud Dataflow Launch Flex Template and Monitor summary: Launch a containerized Flex Template job, poll it to completion, then read its metrics. description: >- Launches a Dataflow job from a Flex Template, which packages the pipeline code in a Docker container and supports dynamic configuration at launch time. The workflow submits the launch parameter, captures the created job id, polls the job until it reports JOB_STATE_DONE, and then retrieves the execution metrics. Every step spells out its request inline, including the inline Bearer authorization Google Cloud requires, so the flow can be read and executed without opening the underlying OpenAPI description. version: 1.0.0 sourceDescriptions: - name: dataflowApi url: ../openapi/google-cloud-dataflow-api-openapi.yml type: openapi workflows: - workflowId: launch-flex-template-and-monitor summary: Launch a Flex Template job, poll until done, and read its metrics. description: >- Launches a Flex Template into a region, captures the created job id, polls the job state until JOB_STATE_DONE, then retrieves the job metrics. inputs: type: object required: - accessToken - projectId - location - jobName - containerSpecGcsPath properties: accessToken: type: string description: Google Cloud OAuth 2.0 access token used as a Bearer credential. projectId: type: string description: The Google Cloud project id that owns the job. location: type: string description: The regional endpoint that contains the job (e.g. us-central1). jobName: type: string description: The name to assign to the launched job. containerSpecGcsPath: type: string description: Cloud Storage path to the Flex Template container spec file (gs://...). parameters: type: object description: Map of runtime template parameter name/value pairs. steps: - stepId: launchFlexTemplate description: >- Launch a Dataflow job from the Flex Template container spec in the requested region. operationId: launchLocationFlexTemplate parameters: - name: Authorization in: header value: Bearer $inputs.accessToken - name: projectId in: path value: $inputs.projectId - name: location in: path value: $inputs.location requestBody: contentType: application/json payload: launchParameter: jobName: $inputs.jobName containerSpecGcsPath: $inputs.containerSpecGcsPath parameters: $inputs.parameters validateOnly: false successCriteria: - condition: $statusCode == 200 outputs: jobId: $response.body#/job/id jobState: $response.body#/job/currentState - stepId: pollJobState description: >- Fetch the launched job and confirm it has reached the terminal JOB_STATE_DONE state before reading metrics. operationId: getLocationJob parameters: - name: Authorization in: header value: Bearer $inputs.accessToken - name: projectId in: path value: $inputs.projectId - name: location in: path value: $inputs.location - name: jobId in: path value: $steps.launchFlexTemplate.outputs.jobId successCriteria: - condition: $statusCode == 200 - context: $response.body condition: $.currentState == "JOB_STATE_DONE" type: jsonpath outputs: currentState: $response.body#/currentState onSuccess: - name: completed type: goto stepId: getMetrics criteria: - context: $response.body condition: $.currentState == "JOB_STATE_DONE" type: jsonpath onFailure: - name: keepPolling type: retry retryAfter: 30 retryLimit: 60 criteria: - condition: $statusCode == 200 - stepId: getMetrics description: >- Retrieve the execution metrics for the completed job, including counters and distributions emitted by the pipeline. operationId: getLocationJobMetrics parameters: - name: Authorization in: header value: Bearer $inputs.accessToken - name: projectId in: path value: $inputs.projectId - name: location in: path value: $inputs.location - name: jobId in: path value: $steps.launchFlexTemplate.outputs.jobId successCriteria: - condition: $statusCode == 200 outputs: metricTime: $response.body#/metricTime metrics: $response.body#/metrics outputs: jobId: $steps.launchFlexTemplate.outputs.jobId finalState: $steps.pollJobState.outputs.currentState metrics: $steps.getMetrics.outputs.metrics