arazzo: 1.0.1 info: title: Amazon Neptune ML Data Processing to Model Training summary: Run a Neptune ML data processing job to completion, then launch and poll model training. description: >- Chains the first two stages of the Neptune ML pipeline. The workflow creates a data processing job that exports and prepares graph data, polls that job until its status is Completed, then launches a model training job that consumes the processed output and polls it until training completes. Both poll loops use a retry delay to handle the in-progress states. Every step spells out its request inline so the flow can be read and executed without opening the underlying OpenAPI description. version: 1.0.0 sourceDescriptions: - name: neptuneMlApi url: ../openapi/amazon-neptune-ml-openapi.yml type: openapi workflows: - workflowId: ml-dataprocessing-to-training summary: Process graph data, then train a model on the processed output. description: >- Creates and polls a data processing job, then creates and polls a model training job that consumes the processed data. inputs: type: object required: - inputDataS3Location - processedDataS3Location - trainModelS3Location properties: inputDataS3Location: type: string description: S3 URI for the input graph data. processedDataS3Location: type: string description: S3 URI where processed output is written. trainModelS3Location: type: string description: S3 location for trained model artifacts. modelName: type: string description: The model type to train (rgcn, transe, distmult, rotate, custom). steps: - stepId: startProcessing description: >- Create a data processing job that exports and prepares graph data into the processed-data S3 location. operationId: createDataProcessingJob requestBody: contentType: application/json payload: inputDataS3Location: $inputs.inputDataS3Location processedDataS3Location: $inputs.processedDataS3Location successCriteria: - condition: $statusCode == 200 outputs: processingJobId: $response.body#/id - stepId: pollProcessing description: >- Poll the data processing job. Retry while it is still in progress and continue once its status is Completed. operationId: getDataProcessingJobStatus parameters: - name: id in: path value: $steps.startProcessing.outputs.processingJobId successCriteria: - condition: $statusCode == 200 outputs: processingStatus: $response.body#/status onSuccess: - name: processingRunning type: retry retryAfter: 60 retryLimit: 120 criteria: - context: $response.body condition: $.status != "Completed" type: jsonpath - name: processingDone type: goto stepId: startTraining criteria: - context: $response.body condition: $.status == "Completed" type: jsonpath - stepId: startTraining description: >- Launch a model training job that consumes the completed data processing job's output. operationId: createModelTrainingJob requestBody: contentType: application/json payload: dataProcessingJobId: $steps.startProcessing.outputs.processingJobId trainModelS3Location: $inputs.trainModelS3Location modelName: $inputs.modelName successCriteria: - condition: $statusCode == 200 outputs: trainingJobId: $response.body#/id - stepId: pollTraining description: >- Poll the model training job. Retry while it is still in progress and finish once its status is Completed. operationId: getModelTrainingJobStatus parameters: - name: id in: path value: $steps.startTraining.outputs.trainingJobId successCriteria: - condition: $statusCode == 200 outputs: trainingStatus: $response.body#/status onSuccess: - name: trainingRunning type: retry retryAfter: 60 retryLimit: 120 criteria: - context: $response.body condition: $.status != "Completed" type: jsonpath - name: trainingDone type: end criteria: - context: $response.body condition: $.status == "Completed" type: jsonpath outputs: processingJobId: $steps.startProcessing.outputs.processingJobId trainingJobId: $steps.startTraining.outputs.trainingJobId trainingStatus: $steps.pollTraining.outputs.trainingStatus