# PIPELINE DEFINITION # Name: battery-ml-pipeline # Description: Complete ML pipeline for battery stress detection and time-to-failure prediction # Inputs: # aws_access_key_id: str [Default: 'minio'] # aws_s3_bucket: str [Default: 'inference'] # aws_s3_endpoint: str [Default: 'http://minio-microshift-vm.microshift-001.svc.cluster.local:30000'] # aws_secret_access_key: str [Default: 'minio123'] # influxdb_bucket: str [Default: 'bms'] # influxdb_org: str [Default: 'redhat'] # influxdb_token: str [Default: 'admin_token'] # influxdb_url: str [Default: 'https://influxdb-battery-demo.apps.replace-domain.io/'] components: comp-condition-1: dag: tasks: save-stress-model-to-s3: cachingOptions: enableCache: true componentRef: name: comp-save-stress-model-to-s3 inputs: artifacts: stress_model: componentInputArtifact: pipelinechannel--train-stress-detection-model-stress_model parameters: aws_access_key_id: componentInputParameter: pipelinechannel--aws_access_key_id aws_s3_bucket: componentInputParameter: pipelinechannel--aws_s3_bucket aws_s3_endpoint: componentInputParameter: pipelinechannel--aws_s3_endpoint aws_secret_access_key: componentInputParameter: pipelinechannel--aws_secret_access_key taskInfo: name: save-stress-model-to-s3 inputDefinitions: artifacts: pipelinechannel--train-stress-detection-model-stress_model: artifactType: schemaTitle: system.Model schemaVersion: 0.0.1 parameters: pipelinechannel--aws_access_key_id: parameterType: STRING pipelinechannel--aws_s3_bucket: parameterType: STRING pipelinechannel--aws_s3_endpoint: parameterType: STRING pipelinechannel--aws_secret_access_key: parameterType: STRING pipelinechannel--validate-stress-model-should_update: parameterType: STRING comp-condition-2: dag: tasks: save-ttf-model-to-s3: cachingOptions: enableCache: true componentRef: name: comp-save-ttf-model-to-s3 inputs: artifacts: ttf_model: componentInputArtifact: pipelinechannel--train-ttf-model-ttf_model parameters: aws_access_key_id: componentInputParameter: pipelinechannel--aws_access_key_id aws_s3_bucket: componentInputParameter: pipelinechannel--aws_s3_bucket aws_s3_endpoint: componentInputParameter: pipelinechannel--aws_s3_endpoint aws_secret_access_key: componentInputParameter: pipelinechannel--aws_secret_access_key taskInfo: name: save-ttf-model-to-s3 inputDefinitions: artifacts: pipelinechannel--train-ttf-model-ttf_model: artifactType: schemaTitle: system.Model schemaVersion: 0.0.1 parameters: pipelinechannel--aws_access_key_id: parameterType: STRING pipelinechannel--aws_s3_bucket: parameterType: STRING pipelinechannel--aws_s3_endpoint: parameterType: STRING pipelinechannel--aws_secret_access_key: parameterType: STRING pipelinechannel--validate-ttf-model-should_update: parameterType: STRING comp-prepare-data: executorLabel: exec-prepare-data inputDefinitions: artifacts: raw_data: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 outputDefinitions: artifacts: prepared_data: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 parameters: prepared_records: parameterType: NUMBER_INTEGER comp-retrieve-influx-data: executorLabel: exec-retrieve-influx-data inputDefinitions: parameters: influxdb_bucket: parameterType: STRING influxdb_org: parameterType: STRING influxdb_token: parameterType: STRING influxdb_url: parameterType: STRING outputDefinitions: artifacts: raw_data: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 parameters: records_count: parameterType: NUMBER_INTEGER comp-save-stress-model-to-s3: executorLabel: exec-save-stress-model-to-s3 inputDefinitions: artifacts: stress_model: artifactType: schemaTitle: system.Model schemaVersion: 0.0.1 parameters: aws_access_key_id: parameterType: STRING aws_s3_bucket: parameterType: STRING aws_s3_endpoint: parameterType: STRING aws_secret_access_key: parameterType: STRING outputDefinitions: parameters: upload_status: parameterType: STRING comp-save-ttf-model-to-s3: executorLabel: exec-save-ttf-model-to-s3 inputDefinitions: artifacts: ttf_model: artifactType: schemaTitle: system.Model schemaVersion: 0.0.1 parameters: aws_access_key_id: parameterType: STRING aws_s3_bucket: parameterType: STRING aws_s3_endpoint: parameterType: STRING aws_secret_access_key: parameterType: STRING outputDefinitions: parameters: upload_status: parameterType: STRING comp-train-stress-detection-model: executorLabel: exec-train-stress-detection-model inputDefinitions: artifacts: prepared_data: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 outputDefinitions: artifacts: stress_model: artifactType: schemaTitle: system.Model schemaVersion: 0.0.1 parameters: accuracy: parameterType: NUMBER_DOUBLE stress_events: parameterType: NUMBER_INTEGER comp-train-ttf-model: executorLabel: exec-train-ttf-model inputDefinitions: artifacts: prepared_data: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 outputDefinitions: artifacts: ttf_model: artifactType: schemaTitle: system.Model schemaVersion: 0.0.1 parameters: mae: parameterType: NUMBER_DOUBLE comp-validate-stress-model: executorLabel: exec-validate-stress-model inputDefinitions: artifacts: new_model: artifactType: schemaTitle: system.Model schemaVersion: 0.0.1 prepared_data: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 parameters: aws_access_key_id: parameterType: STRING aws_s3_bucket: parameterType: STRING aws_s3_endpoint: parameterType: STRING aws_secret_access_key: parameterType: STRING new_model_accuracy: parameterType: NUMBER_DOUBLE outputDefinitions: parameters: current_accuracy: parameterType: NUMBER_DOUBLE new_accuracy: parameterType: NUMBER_DOUBLE should_update: parameterType: STRING comp-validate-ttf-model: executorLabel: exec-validate-ttf-model inputDefinitions: artifacts: new_model: artifactType: schemaTitle: system.Model schemaVersion: 0.0.1 prepared_data: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 parameters: aws_access_key_id: parameterType: STRING aws_s3_bucket: parameterType: STRING aws_s3_endpoint: parameterType: STRING aws_secret_access_key: parameterType: STRING new_model_mae: parameterType: NUMBER_DOUBLE outputDefinitions: parameters: current_mae: parameterType: NUMBER_DOUBLE new_mae: parameterType: NUMBER_DOUBLE should_update: parameterType: STRING defaultPipelineRoot: gs://your-bucket/pipeline-root deploymentSpec: executors: exec-prepare-data: container: args: - --executor_input - '{{$}}' - --function_to_execute - prepare_data command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'pandas' &&\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.4'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef prepare_data(\n raw_data: Input[Dataset],\n prepared_data:\ \ Output[Dataset]\n) -> NamedTuple('Outputs', [('prepared_records', int)]):\n\ \ \"\"\"\n Component 02: Prepare data by pivoting columns\n \"\"\ \"\n import pandas as pd\n\n # Load InfluxDB CSV Data\n df = pd.read_csv(raw_data.path)\n\ \n # Pivot the data so that '_field' values become columns\n df_pivot\ \ = df.pivot(index=[\"_time\", \"batteryId\"], columns=\"_field\", values=\"\ _value\").reset_index()\n\n # Rename `_time` to `timestamp` for clarity\n\ \ df_pivot.rename(columns={\"_time\": \"timestamp\"}, inplace=True)\n\ \n # Save prepared data\n df_pivot.to_csv(prepared_data.path, index=False)\n\ \n print(f\"Prepared {len(df_pivot)} records\")\n print(\"Data columns:\"\ , list(df_pivot.columns))\n\n from collections import namedtuple\n \ \ output = namedtuple('Outputs', ['prepared_records'])\n return output(len(df_pivot))\n\ \n" image: registry.access.redhat.com/ubi9/python-311:latest exec-retrieve-influx-data: container: args: - --executor_input - '{{$}}' - --function_to_execute - retrieve_influx_data command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'pandas' 'influxdb-client'\ \ && python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.4'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef retrieve_influx_data(\n influxdb_url: str,\n influxdb_token:\ \ str,\n influxdb_org: str,\n influxdb_bucket: str,\n raw_data:\ \ Output[Dataset]\n) -> NamedTuple('Outputs', [('records_count', int)]):\n\ \ \"\"\"\n Component 01: Retrieve data from InfluxDB\n \"\"\"\n\ \ from influxdb_client import InfluxDBClient\n import pandas as pd\n\ \ import os\n\n # Initialize Client (disable SSL verification for\ \ self-signed certificates)\n client = InfluxDBClient(url=influxdb_url,\ \ token=influxdb_token, org=influxdb_org, verify_ssl=False)\n\n def retrieve_battery_data():\n\ \ query = f'''\n from(bucket: \"{influxdb_bucket}\")\n \ \ |> range(start: -1h)\n |> filter(fn: (r) => r[\"_measurement\"\ ] == \"battery_data\")\n '''\n query_api = client.query_api()\n\ \ tables = query_api.query(query, org=influxdb_org)\n\n #\ \ Process Results\n data = []\n for table in tables:\n \ \ for record in table.records:\n data.append(record.values)\n\ \n df = pd.DataFrame(data)\n return df\n\n try:\n \ \ df = retrieve_battery_data()\n # Save raw data\n df.to_csv(raw_data.path,\ \ index=False)\n records_count = len(df)\n print(f\"Retrieved\ \ {records_count} records from InfluxDB\")\n\n except Exception as e:\n\ \ print(f\"Error connecting to InfluxDB: {e}\")\n # Fallback:\ \ use sample data\n print(\"Using fallback sample data...\")\n \ \ sample_data = {\n '_time': ['2025-02-12 14:26:34.190000+00:00']\ \ * 8,\n 'batteryId': [1] * 8,\n '_field': ['ambientTemp',\ \ 'batteryCurrent', 'batteryTemp', 'batteryVoltage', \n \ \ 'distance', 'kmh', 'stateOfCharge', 'stateOfHealth'],\n \ \ '_value': [18.65, 78.06, 25.22, 396.39, 0.2, 127.75, 0.9991, 99.9998]\n\ \ }\n df = pd.DataFrame(sample_data)\n df.to_csv(raw_data.path,\ \ index=False)\n records_count = len(df)\n\n client.close()\n\n\ \ from collections import namedtuple\n output = namedtuple('Outputs',\ \ ['records_count'])\n return output(records_count)\n\n" image: registry.access.redhat.com/ubi9/python-311:latest exec-save-stress-model-to-s3: container: args: - --executor_input - '{{$}}' - --function_to_execute - save_stress_model_to_s3 command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'boto3' && \ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.4'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef save_stress_model_to_s3(\n stress_model: Input[Model],\n \ \ aws_access_key_id: str,\n aws_secret_access_key: str,\n aws_s3_endpoint:\ \ str,\n aws_s3_bucket: str\n) -> NamedTuple('Outputs', [('upload_status',\ \ str)]):\n \"\"\"\n Component: Save stress detection model to S3\n\ \ \"\"\"\n import boto3\n import os\n from botocore.exceptions\ \ import ClientError\n\n try:\n # Create s3 connection\n \ \ s3_client = boto3.client(\n 's3',\n aws_access_key_id=aws_access_key_id,\n\ \ aws_secret_access_key=aws_secret_access_key,\n endpoint_url=aws_s3_endpoint\n\ \ )\n\n # Upload only required files: .xml, .bin, and scaler\n\ \ stress_model_files = []\n for root, dirs, files in os.walk(stress_model.path):\n\ \ for file in files:\n file_path = os.path.join(root,\ \ file)\n # Only upload .xml, .bin and scaler files\n \ \ if file.endswith('.xml'):\n s3_key = \"\ stress-detection/1/stress-detection.xml\"\n elif file.endswith('.bin'):\n\ \ s3_key = \"stress-detection/1/stress-detection.bin\"\ \n elif file == \"stress_scaler.pkl\":\n \ \ s3_key = \"scalers/stress_scaler.pkl\"\n else:\n \ \ # Skip other files (.keras, .pb, .index, .data-*)\n \ \ continue\n s3_client.upload_file(file_path,\ \ aws_s3_bucket, s3_key)\n stress_model_files.append(s3_key)\n\ \n status = f\"Successfully uploaded {len(stress_model_files)} stress\ \ model files to S3\"\n print(status)\n\n except ClientError as\ \ e:\n status = f\"Error uploading stress model to S3: {e}\"\n \ \ print(status)\n except Exception as e:\n status = f\"Unexpected\ \ error: {e}\"\n print(status)\n\n from collections import namedtuple\n\ \ output = namedtuple('Outputs', ['upload_status'])\n return output(status)\n\ \n" image: registry.access.redhat.com/ubi9/python-311:latest exec-save-ttf-model-to-s3: container: args: - --executor_input - '{{$}}' - --function_to_execute - save_ttf_model_to_s3 command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'boto3' && \ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.4'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef save_ttf_model_to_s3(\n ttf_model: Input[Model],\n aws_access_key_id:\ \ str,\n aws_secret_access_key: str,\n aws_s3_endpoint: str,\n \ \ aws_s3_bucket: str\n) -> NamedTuple('Outputs', [('upload_status', str)]):\n\ \ \"\"\"\n Component: Save TTF model to S3\n \"\"\"\n import\ \ boto3\n import os\n from botocore.exceptions import ClientError\n\ \n try:\n # Create s3 connection\n s3_client = boto3.client(\n\ \ 's3',\n aws_access_key_id=aws_access_key_id,\n \ \ aws_secret_access_key=aws_secret_access_key,\n endpoint_url=aws_s3_endpoint\n\ \ )\n\n # Upload only required files: .xml, .bin, and scaler\n\ \ ttf_model_files = []\n for root, dirs, files in os.walk(ttf_model.path):\n\ \ for file in files:\n file_path = os.path.join(root,\ \ file)\n # Only upload .xml, .bin and scaler files\n \ \ if file.endswith('.xml'):\n s3_key = \"\ time-to-failure/1/time-to-failure.xml\"\n elif file.endswith('.bin'):\n\ \ s3_key = \"time-to-failure/1/time-to-failure.bin\"\n\ \ elif file == \"ttf_scaler.pkl\":\n s3_key\ \ = \"scalers/ttf_scaler.pkl\"\n else:\n \ \ # Skip other files (.keras, .pb, .index, .data-*)\n \ \ continue\n s3_client.upload_file(file_path, aws_s3_bucket,\ \ s3_key)\n ttf_model_files.append(s3_key)\n\n status\ \ = f\"Successfully uploaded {len(ttf_model_files)} TTF model files to S3\"\ \n print(status)\n\n except ClientError as e:\n status\ \ = f\"Error uploading TTF model to S3: {e}\"\n print(status)\n \ \ except Exception as e:\n status = f\"Unexpected error: {e}\"\n\ \ print(status)\n\n from collections import namedtuple\n output\ \ = namedtuple('Outputs', ['upload_status'])\n return output(status)\n\ \n" image: registry.access.redhat.com/ubi9/python-311:latest exec-train-stress-detection-model: container: args: - --executor_input - '{{$}}' - --function_to_execute - train_stress_detection_model command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'pandas' 'scikit-learn'\ \ 'tensorflow' 'openvino' 'joblib' && python3 -m pip install --quiet --no-warn-script-location\ \ 'kfp==2.14.4' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ 3.9\"' && \"$0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef train_stress_detection_model(\n prepared_data: Input[Dataset],\n\ \ stress_model: Output[Model]\n) -> NamedTuple('Outputs', [('accuracy',\ \ float), ('stress_events', int)]):\n \"\"\"\n Component 03: Train\ \ stress detection model with OpenVINO conversion\n \"\"\"\n import\ \ pandas as pd\n from sklearn.model_selection import train_test_split\n\ \ from sklearn.preprocessing import StandardScaler\n import tensorflow\ \ as tf\n from tensorflow import keras\n import os\n\n # Force\ \ CPU usage to avoid CUDA issues\n os.environ['CUDA_VISIBLE_DEVICES']\ \ = '-1'\n\n # Load data\n df = pd.read_csv(prepared_data.path)\n\n\ \ # Define stress condition (1 = Stress, 0 = Normal)\n def detect_stress(row):\n\ \ if row[\"batteryCurrent\"] > 400 or row[\"batteryTemp\"] > 50 or\ \ row[\"stateOfCharge\"] < 0.05 or row[\"batteryVoltage\"] < 320:\n \ \ return 1 # Stress condition\n return 0 # Normal condition\n\ \n # Apply stress detection\n df[\"stressIndicator\"] = df.apply(detect_stress,\ \ axis=1)\n stress_events = df[\"stressIndicator\"].sum()\n\n # Define\ \ Features and Target\n features = [\"stateOfCharge\", \"stateOfHealth\"\ , \"batteryCurrent\", \"batteryVoltage\", \n \"kmh\", \"\ distance\", \"batteryTemp\", \"ambientTemp\", \"currentLoad\"]\n X =\ \ df[features]\n y = df[\"stressIndicator\"]\n\n # Split Data\n \ \ X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,\ \ random_state=42)\n\n # Normalize data\n scaler = StandardScaler()\n\ \ X_train_scaled = scaler.fit_transform(X_train)\n X_test_scaled =\ \ scaler.transform(X_test)\n\n # Define neural network\n mlp_tf =\ \ keras.Sequential([\n keras.layers.Input(shape=(X_train.shape[1],)),\n\ \ keras.layers.Dense(64, activation='relu'),\n keras.layers.Dense(32,\ \ activation='relu'),\n keras.layers.Dense(1, activation='sigmoid')\n\ \ ])\n\n # Compile model\n mlp_tf.compile(optimizer='adam', loss='binary_crossentropy',\ \ metrics=['accuracy'])\n\n # Train model\n history = mlp_tf.fit(X_train_scaled,\ \ y_train, epochs=50, batch_size=32, \n validation_split=0.1,\ \ verbose=0)\n\n # Evaluate model\n test_loss, test_accuracy = mlp_tf.evaluate(X_test_scaled,\ \ y_test, verbose=0)\n\n # Create model directory and save\n model_dir\ \ = stress_model.path\n os.makedirs(model_dir, exist_ok=True)\n\n \ \ # Save scaler for inference preprocessing\n import joblib\n scaler_path\ \ = os.path.join(model_dir, \"stress_scaler.pkl\")\n joblib.dump(scaler,\ \ scaler_path)\n print(f\"Scaler saved to {scaler_path}\")\n\n # Save\ \ in Keras native format for validation\n keras_model_path = os.path.join(model_dir,\ \ \"model.keras\")\n mlp_tf.save(keras_model_path)\n\n # Save as SavedModel\ \ format for OpenVINO conversion\n saved_model_path = os.path.join(model_dir,\ \ \"saved_model\")\n tf.saved_model.save(mlp_tf, saved_model_path)\n\n\ \ # Convert to OpenVINO format\n try:\n import subprocess\n\ \ cmd = f\"ovc {saved_model_path} --output_model {model_dir}/model\"\ \n subprocess.run(cmd, shell=True, check=True, capture_output=True,\ \ text=True)\n print(\"Model converted to OpenVINO format (.xml/.bin)\"\ )\n except subprocess.CalledProcessError as e:\n print(f\"OpenVINO\ \ conversion failed: {e}\")\n print(\"Model saved in TensorFlow format\ \ only\")\n except Exception as e:\n print(f\"OpenVINO conversion\ \ error: {e}\")\n\n print(f\"Stress detection model trained with accuracy:\ \ {test_accuracy:.4f}\")\n print(f\"Detected {stress_events} stress events\"\ )\n\n from collections import namedtuple\n output = namedtuple('Outputs',\ \ ['accuracy', 'stress_events'])\n return output(float(test_accuracy),\ \ int(stress_events))\n\n" image: registry.access.redhat.com/ubi9/python-311:latest exec-train-ttf-model: container: args: - --executor_input - '{{$}}' - --function_to_execute - train_ttf_model command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'pandas' 'scikit-learn'\ \ 'tensorflow' 'openvino' 'joblib' && python3 -m pip install --quiet --no-warn-script-location\ \ 'kfp==2.14.4' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ 3.9\"' && \"$0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef train_ttf_model(\n prepared_data: Input[Dataset],\n ttf_model:\ \ Output[Model]\n) -> NamedTuple('Outputs', [('mae', float)]):\n \"\"\ \"\n Component 05: Train time-to-failure (TTF) model\n \"\"\"\n \ \ import pandas as pd\n from sklearn.model_selection import train_test_split\n\ \ from sklearn.metrics import mean_absolute_error\n from sklearn.preprocessing\ \ import StandardScaler\n import tensorflow as tf\n from tensorflow\ \ import keras\n import os\n\n # Force CPU usage\n os.environ['CUDA_VISIBLE_DEVICES']\ \ = '-1'\n\n # Load data\n df = pd.read_csv(prepared_data.path)\n\n\ \ # Convert timestamp to datetime for time-series processing\n df[\"\ timestamp\"] = pd.to_datetime(df[\"timestamp\"])\n\n # Simulate Time-to-Failure\ \ (assuming failure happens at the last recorded timestamp)\n df[\"timeBeforeFailure\"\ ] = (df[\"timestamp\"].max() - df[\"timestamp\"]).dt.total_seconds() / 3600\ \ # Convert to hours\n\n # Define Features and Target for TTF\n features\ \ = [\"batteryTemp\", \"batteryCurrent\", \"batteryVoltage\", \"stateOfCharge\"\ , \"stateOfHealth\"]\n X = df[features]\n y = df[\"timeBeforeFailure\"\ ]\n\n # Split Data\n X_train, X_test, y_train, y_test = train_test_split(X,\ \ y, test_size=0.2, random_state=42)\n\n # Normalize data\n scaler\ \ = StandardScaler()\n X_train_scaled = scaler.fit_transform(X_train)\n\ \ X_test_scaled = scaler.transform(X_test)\n\n # Define neural network\ \ for regression (3 hidden layers).\n # Use input_shape in first Dense\ \ (no explicit Input layer) so SavedModel/OpenVINO\n # input name is\ \ \"keras_tensor\", matching the notebook 05_bms-ttf-training.\n ttf_model_tf\ \ = keras.Sequential([\n keras.layers.Dense(128, activation='relu',\ \ input_shape=(X_train.shape[1],)),\n keras.layers.Dense(64, activation='relu'),\n\ \ keras.layers.Dense(32, activation='relu'),\n keras.layers.Dense(1)\ \ # No activation for regression\n ])\n\n # Compile model\n ttf_model_tf.compile(optimizer='adam',\ \ loss='mse', metrics=['mae'])\n\n # Train model\n history = ttf_model_tf.fit(X_train_scaled,\ \ y_train, epochs=50, batch_size=32, \n validation_split=0.1,\ \ verbose=0)\n\n # Evaluate model\n y_pred = ttf_model_tf.predict(X_test_scaled,\ \ verbose=0)\n mae = mean_absolute_error(y_test, y_pred)\n\n # Create\ \ model directory and save\n model_dir = ttf_model.path\n os.makedirs(model_dir,\ \ exist_ok=True)\n\n # Save scaler for inference preprocessing\n import\ \ joblib\n scaler_path = os.path.join(model_dir, \"ttf_scaler.pkl\")\n\ \ joblib.dump(scaler, scaler_path)\n print(f\"Scaler saved to {scaler_path}\"\ )\n\n # Save in Keras native format for validation\n keras_model_path\ \ = os.path.join(model_dir, \"model.keras\")\n ttf_model_tf.save(keras_model_path)\n\ \n # Save as SavedModel using Keras API (same as notebook) so input name\ \ is \"keras_tensor\"\n saved_model_path = os.path.join(model_dir, \"\ saved_model\")\n try:\n ttf_model_tf.export(saved_model_path)\n\ \ except AttributeError:\n ttf_model_tf.save(saved_model_path,\ \ save_format=\"tf\")\n\n # Convert to OpenVINO format\n try:\n \ \ import subprocess\n cmd = f\"ovc {saved_model_path} --output_model\ \ {model_dir}/model\"\n subprocess.run(cmd, shell=True, check=True,\ \ capture_output=True, text=True)\n print(\"TTF model converted to\ \ OpenVINO format (.xml/.bin)\")\n except subprocess.CalledProcessError\ \ as e:\n print(f\"OpenVINO conversion failed: {e}\")\n print(\"\ TTF model saved in TensorFlow format only\")\n except Exception as e:\n\ \ print(f\"OpenVINO conversion error: {e}\")\n\n print(f\"TTF\ \ model trained with MAE: {mae:.4f} hours\")\n\n from collections import\ \ namedtuple\n output = namedtuple('Outputs', ['mae'])\n return output(float(mae))\n\ \n" image: registry.access.redhat.com/ubi9/python-311:latest exec-validate-stress-model: container: args: - --executor_input - '{{$}}' - --function_to_execute - validate_stress_model command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'pandas'\ \ 'openvino>=2023.2.0' 'numpy' && python3 -m pip install --quiet --no-warn-script-location\ \ 'kfp==2.14.4' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ 3.9\"' && \"$0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef validate_stress_model(\n new_model: Input[Model],\n prepared_data:\ \ Input[Dataset],\n new_model_accuracy: float,\n aws_access_key_id:\ \ str,\n aws_secret_access_key: str,\n aws_s3_endpoint: str,\n \ \ aws_s3_bucket: str\n) -> NamedTuple('Outputs', [('should_update', str),\ \ ('new_accuracy', float), ('current_accuracy', float)]):\n \"\"\"\n\ \ Component: Validate stress model against existing model in S3 using\ \ OpenVINO\n \"\"\"\n import boto3\n import os\n import tempfile\n\ \ import pandas as pd\n import numpy as np\n try:\n from openvino.runtime import Core\n except ImportError:\n from openvino import Core\n from botocore.exceptions import ClientError\n\n should_update\ \ = \"false\"\n current_accuracy = 0.0\n\n try:\n # Create\ \ s3 connection\n s3_client = boto3.client(\n 's3',\n\ \ aws_access_key_id=aws_access_key_id,\n aws_secret_access_key=aws_secret_access_key,\n\ \ endpoint_url=aws_s3_endpoint\n )\n\n # Try to\ \ download existing OpenVINO model from S3\n temp_dir = tempfile.mkdtemp()\n\ \ xml_s3_key = \"stress-detection/1/stress-detection.xml\"\n \ \ bin_s3_key = \"stress-detection/1/stress-detection.bin\"\n xml_local\ \ = os.path.join(temp_dir, \"stress-detection.xml\")\n bin_local\ \ = os.path.join(temp_dir, \"stress-detection.bin\")\n\n try:\n \ \ s3_client.download_file(aws_s3_bucket, xml_s3_key, xml_local)\n\ \ s3_client.download_file(aws_s3_bucket, bin_s3_key, bin_local)\n\ \ print(f\"Found existing OpenVINO model in S3, downloading for\ \ comparison...\")\n\n # Load existing model with OpenVINO\n\ \ core = Core()\n existing_model = core.compile_model(xml_local,\ \ \"CPU\")\n infer_request = existing_model.create_infer_request()\n\ \n # Load test data and evaluate\n df = pd.read_csv(prepared_data.path)\n\ \n # Define stress condition for labels\n def detect_stress(row):\n\ \ if row[\"batteryCurrent\"] > 400 or row[\"batteryTemp\"\ ] > 50 or row[\"stateOfCharge\"] < 0.05 or row[\"batteryVoltage\"] < 320:\n\ \ return 1\n return 0\n\n df[\"\ stressIndicator\"] = df.apply(detect_stress, axis=1)\n\n features\ \ = [\"stateOfCharge\", \"stateOfHealth\", \"batteryCurrent\", \"batteryVoltage\"\ , \n \"kmh\", \"distance\", \"batteryTemp\", \"ambientTemp\"\ , \"currentLoad\"]\n X = df[features].values.astype(np.float32)\n\ \ y = df[\"stressIndicator\"].values\n\n # Normalize\ \ features (simple normalization matching training)\n X_mean\ \ = X.mean(axis=0)\n X_std = X.std(axis=0) + 1e-8\n \ \ X_normalized = (X - X_mean) / X_std\n\n # Evaluate existing\ \ model using OpenVINO inference\n correct = 0\n for\ \ i in range(len(X_normalized)):\n input_data = X_normalized[i:i+1]\n\ \ infer_request.infer({0: input_data})\n output\ \ = infer_request.get_output_tensor(0).data\n prediction\ \ = 1 if output[0][0] > 0.5 else 0\n if prediction == y[i]:\n\ \ correct += 1\n\n current_accuracy = correct\ \ / len(y)\n\n print(f\"Current model accuracy: {current_accuracy:.4f}\"\ )\n print(f\"New model accuracy: {new_model_accuracy:.4f}\")\n\ \n # Compare models (any improvement is enough)\n \ \ if new_model_accuracy > current_accuracy:\n should_update\ \ = \"true\"\n print(f\"[APPROVED] New model is better! Improvement:\ \ {(new_model_accuracy - current_accuracy)*100:.2f}%\")\n else:\n\ \ print(f\"[REJECTED] New model not better. Keeping current\ \ model.\")\n\n except ClientError as e:\n error_code\ \ = e.response.get('Error', {}).get('Code', '')\n if error_code\ \ in ['NoSuchKey', '404', 'NotFound']:\n print(\"No existing\ \ model found in S3, will upload new model\")\n should_update\ \ = \"true\"\n else:\n print(f\"S3 error: {e}\"\ )\n print(\"Defaulting to update new model\")\n \ \ should_update = \"true\"\n\n except Exception as e:\n \ \ print(f\"Error during validation: {e}\")\n import traceback\n \ \ traceback.print_exc()\n print(\"Defaulting to update new\ \ model\")\n should_update = \"true\"\n\n from collections import\ \ namedtuple\n output = namedtuple('Outputs', ['should_update', 'new_accuracy',\ \ 'current_accuracy'])\n return output(should_update, float(new_model_accuracy),\ \ float(current_accuracy))\n\n" image: registry.access.redhat.com/ubi9/python-311:latest exec-validate-ttf-model: container: args: - --executor_input - '{{$}}' - --function_to_execute - validate_ttf_model command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'pandas'\ \ 'openvino>=2023.2.0' 'numpy' 'scikit-learn' && python3 -m pip install --quiet\ \ --no-warn-script-location 'kfp==2.14.4' '--no-deps' 'typing-extensions>=3.7.4,<5;\ \ python_version<\"3.9\"' && \"$0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef validate_ttf_model(\n new_model: Input[Model],\n prepared_data:\ \ Input[Dataset],\n new_model_mae: float,\n aws_access_key_id: str,\n\ \ aws_secret_access_key: str,\n aws_s3_endpoint: str,\n aws_s3_bucket:\ \ str\n) -> NamedTuple('Outputs', [('should_update', str), ('new_mae', float),\ \ ('current_mae', float)]):\n \"\"\"\n Component: Validate TTF model\ \ against existing model in S3 using OpenVINO\n \"\"\"\n import boto3\n\ \ import os\n import tempfile\n import pandas as pd\n import\ \ numpy as np\n try:\n from openvino.runtime import Core\n except ImportError:\n from openvino import Core\n from sklearn.metrics\ \ import mean_absolute_error\n from botocore.exceptions import ClientError\n\ \n should_update = \"false\"\n current_mae = 999.0 # High default\ \ value\n\n try:\n # Create s3 connection\n s3_client =\ \ boto3.client(\n 's3',\n aws_access_key_id=aws_access_key_id,\n\ \ aws_secret_access_key=aws_secret_access_key,\n endpoint_url=aws_s3_endpoint\n\ \ )\n\n # Try to download existing OpenVINO model from S3\n\ \ temp_dir = tempfile.mkdtemp()\n xml_s3_key = \"time-to-failure/1/time-to-failure.xml\"\ \n bin_s3_key = \"time-to-failure/1/time-to-failure.bin\"\n \ \ xml_local = os.path.join(temp_dir, \"time-to-failure.xml\")\n \ \ bin_local = os.path.join(temp_dir, \"time-to-failure.bin\")\n\n \ \ try:\n s3_client.download_file(aws_s3_bucket, xml_s3_key,\ \ xml_local)\n s3_client.download_file(aws_s3_bucket, bin_s3_key,\ \ bin_local)\n print(f\"Found existing TTF OpenVINO model in\ \ S3, downloading for comparison...\")\n\n # Load existing model\ \ with OpenVINO\n core = Core()\n existing_model =\ \ core.compile_model(xml_local, \"CPU\")\n infer_request = existing_model.create_infer_request()\n\ \n # Load test data and evaluate\n df = pd.read_csv(prepared_data.path)\n\ \ df[\"timestamp\"] = pd.to_datetime(df[\"timestamp\"])\n \ \ df[\"timeBeforeFailure\"] = (df[\"timestamp\"].max() - df[\"timestamp\"\ ]).dt.total_seconds() / 3600\n\n features = [\"batteryTemp\"\ , \"batteryCurrent\", \"batteryVoltage\", \"stateOfCharge\", \"stateOfHealth\"\ ]\n X = df[features].values.astype(np.float32)\n y\ \ = df[\"timeBeforeFailure\"].values\n\n # Normalize features\ \ (matching training)\n X_mean = X.mean(axis=0)\n \ \ X_std = X.std(axis=0) + 1e-8\n X_normalized = (X - X_mean)\ \ / X_std\n\n # Predict using OpenVINO inference\n \ \ predictions = []\n for i in range(len(X_normalized)):\n \ \ input_data = X_normalized[i:i+1]\n infer_request.infer({0:\ \ input_data})\n output = infer_request.get_output_tensor(0).data\n\ \ predictions.append(output[0][0])\n\n y_pred\ \ = np.array(predictions)\n current_mae = mean_absolute_error(y,\ \ y_pred)\n\n print(f\"Current model MAE: {current_mae:.4f} hours\"\ )\n print(f\"New model MAE: {new_model_mae:.4f} hours\")\n\n\ \ # Compare (lower MAE is better, any improvement is enough)\n\ \ if new_model_mae < current_mae:\n should_update\ \ = \"true\"\n improvement = ((current_mae - new_model_mae)\ \ / current_mae) * 100\n print(f\"[APPROVED] New model is\ \ better! Improvement: {improvement:.2f}%\")\n else:\n \ \ print(f\"[REJECTED] New model not better. Keeping current model.\"\ )\n\n except ClientError as e:\n error_code = e.response.get('Error',\ \ {}).get('Code', '')\n if error_code in ['NoSuchKey', '404',\ \ 'NotFound']:\n print(\"No existing TTF model found in S3,\ \ will upload new model\")\n should_update = \"true\"\n \ \ else:\n print(f\"S3 error: {e}\")\n \ \ print(\"Defaulting to update new model\")\n should_update\ \ = \"true\"\n\n except Exception as e:\n print(f\"Error during\ \ TTF validation: {e}\")\n import traceback\n traceback.print_exc()\n\ \ print(\"Defaulting to update new model\")\n should_update\ \ = \"true\"\n\n from collections import namedtuple\n output = namedtuple('Outputs',\ \ ['should_update', 'new_mae', 'current_mae'])\n return output(should_update,\ \ float(new_model_mae), float(current_mae))\n\n" image: registry.access.redhat.com/ubi9/python-311:latest pipelineInfo: description: Complete ML pipeline for battery stress detection and time-to-failure prediction name: battery-ml-pipeline root: dag: tasks: condition-1: componentRef: name: comp-condition-1 dependentTasks: - train-stress-detection-model - validate-stress-model inputs: artifacts: pipelinechannel--train-stress-detection-model-stress_model: taskOutputArtifact: outputArtifactKey: stress_model producerTask: train-stress-detection-model parameters: pipelinechannel--aws_access_key_id: componentInputParameter: aws_access_key_id pipelinechannel--aws_s3_bucket: componentInputParameter: aws_s3_bucket pipelinechannel--aws_s3_endpoint: componentInputParameter: aws_s3_endpoint pipelinechannel--aws_secret_access_key: componentInputParameter: aws_secret_access_key pipelinechannel--validate-stress-model-should_update: taskOutputParameter: outputParameterKey: should_update producerTask: validate-stress-model taskInfo: name: stress-model-approved triggerPolicy: condition: inputs.parameter_values['pipelinechannel--validate-stress-model-should_update'] == 'true' condition-2: componentRef: name: comp-condition-2 dependentTasks: - train-ttf-model - validate-ttf-model inputs: artifacts: pipelinechannel--train-ttf-model-ttf_model: taskOutputArtifact: outputArtifactKey: ttf_model producerTask: train-ttf-model parameters: pipelinechannel--aws_access_key_id: componentInputParameter: aws_access_key_id pipelinechannel--aws_s3_bucket: componentInputParameter: aws_s3_bucket pipelinechannel--aws_s3_endpoint: componentInputParameter: aws_s3_endpoint pipelinechannel--aws_secret_access_key: componentInputParameter: aws_secret_access_key pipelinechannel--validate-ttf-model-should_update: taskOutputParameter: outputParameterKey: should_update producerTask: validate-ttf-model taskInfo: name: ttf-model-approved triggerPolicy: condition: inputs.parameter_values['pipelinechannel--validate-ttf-model-should_update'] == 'true' prepare-data: cachingOptions: enableCache: true componentRef: name: comp-prepare-data dependentTasks: - retrieve-influx-data inputs: artifacts: raw_data: taskOutputArtifact: outputArtifactKey: raw_data producerTask: retrieve-influx-data taskInfo: name: prepare-data retrieve-influx-data: cachingOptions: enableCache: true componentRef: name: comp-retrieve-influx-data inputs: parameters: influxdb_bucket: componentInputParameter: influxdb_bucket influxdb_org: componentInputParameter: influxdb_org influxdb_token: componentInputParameter: influxdb_token influxdb_url: componentInputParameter: influxdb_url taskInfo: name: retrieve-influx-data train-stress-detection-model: cachingOptions: enableCache: true componentRef: name: comp-train-stress-detection-model dependentTasks: - prepare-data inputs: artifacts: prepared_data: taskOutputArtifact: outputArtifactKey: prepared_data producerTask: prepare-data taskInfo: name: train-stress-detection-model train-ttf-model: cachingOptions: enableCache: true componentRef: name: comp-train-ttf-model dependentTasks: - prepare-data inputs: artifacts: prepared_data: taskOutputArtifact: outputArtifactKey: prepared_data producerTask: prepare-data taskInfo: name: train-ttf-model validate-stress-model: cachingOptions: enableCache: true componentRef: name: comp-validate-stress-model dependentTasks: - prepare-data - train-stress-detection-model inputs: artifacts: new_model: taskOutputArtifact: outputArtifactKey: stress_model producerTask: train-stress-detection-model prepared_data: taskOutputArtifact: outputArtifactKey: prepared_data producerTask: prepare-data parameters: aws_access_key_id: componentInputParameter: aws_access_key_id aws_s3_bucket: componentInputParameter: aws_s3_bucket aws_s3_endpoint: componentInputParameter: aws_s3_endpoint aws_secret_access_key: componentInputParameter: aws_secret_access_key new_model_accuracy: taskOutputParameter: outputParameterKey: accuracy producerTask: train-stress-detection-model taskInfo: name: validate-stress-model validate-ttf-model: cachingOptions: enableCache: true componentRef: name: comp-validate-ttf-model dependentTasks: - prepare-data - train-ttf-model inputs: artifacts: new_model: taskOutputArtifact: outputArtifactKey: ttf_model producerTask: train-ttf-model prepared_data: taskOutputArtifact: outputArtifactKey: prepared_data producerTask: prepare-data parameters: aws_access_key_id: componentInputParameter: aws_access_key_id aws_s3_bucket: componentInputParameter: aws_s3_bucket aws_s3_endpoint: componentInputParameter: aws_s3_endpoint aws_secret_access_key: componentInputParameter: aws_secret_access_key new_model_mae: taskOutputParameter: outputParameterKey: mae producerTask: train-ttf-model taskInfo: name: validate-ttf-model inputDefinitions: parameters: aws_access_key_id: defaultValue: minio isOptional: true parameterType: STRING aws_s3_bucket: defaultValue: inference isOptional: true parameterType: STRING aws_s3_endpoint: defaultValue: http://minio-microshift-vm.microshift-001.svc.cluster.local:30000 isOptional: true parameterType: STRING aws_secret_access_key: defaultValue: minio123 isOptional: true parameterType: STRING influxdb_bucket: defaultValue: bms isOptional: true parameterType: STRING influxdb_org: defaultValue: redhat isOptional: true parameterType: STRING influxdb_token: defaultValue: admin_token isOptional: true parameterType: STRING influxdb_url: defaultValue: https://influxdb-battery-demo.apps.replace-domain.io/ isOptional: true parameterType: STRING schemaVersion: 2.1.0 sdkVersion: kfp-2.14.4