--- name: ML Pipeline Automation description: Build end-to-end ML pipelines with automated data processing, training, validation, and deployment using Airflow, Kubeflow, and Jenkins --- # ML Pipeline Automation ML pipeline automation orchestrates the entire machine learning workflow from data ingestion through model deployment, ensuring reproducibility, scalability, and reliability. ## Pipeline Components - **Data Ingestion**: Collecting data from multiple sources - **Data Processing**: Cleaning, transformation, feature engineering - **Model Training**: Training and hyperparameter tuning - **Validation**: Cross-validation and testing - **Deployment**: Moving models to production - **Monitoring**: Tracking performance metrics ## Orchestration Platforms - **Apache Airflow**: Workflow scheduling with DAGs - **Kubeflow**: Kubernetes-native ML workflows - **Jenkins**: CI/CD for ML pipelines - **Prefect**: Modern data flow orchestration - **Dagster**: Asset-driven orchestration ## Python Implementation ```python import pandas as pd import numpy as np from sklearn.datasets import make_classification from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, f1_score import joblib import logging from datetime import datetime import json import os # Airflow imports from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago # MLflow for tracking import mlflow import mlflow.sklearn # Logging setup logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) print("=== 1. Modular Pipeline Functions ===") # Data ingestion def ingest_data(**context): """Ingest and load data""" logger.info("Starting data ingestion...") X, y = make_classification(n_samples=2000, n_features=30, n_informative=20, random_state=42) data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])]) data['target'] = y # Save to disk data_path = '/tmp/raw_data.csv' data.to_csv(data_path, index=False) context['task_instance'].xcom_push(key='data_path', value=data_path) logger.info(f"Data ingested: {len(data)} rows") return {'status': 'success', 'samples': len(data)} # Data processing def process_data(**context): """Clean and preprocess data""" logger.info("Starting data processing...") # Get data path from previous task task_instance = context['task_instance'] data_path = task_instance.xcom_pull(key='data_path', task_ids='ingest_data') data = pd.read_csv(data_path) # Handle missing values data = data.fillna(data.mean()) # Remove duplicates data = data.drop_duplicates() # Remove outliers (simple approach) numeric_cols = data.select_dtypes(include=[np.number]).columns for col in numeric_cols: Q1 = data[col].quantile(0.25) Q3 = data[col].quantile(0.75) IQR = Q3 - Q1 data = data[(data[col] >= Q1 - 1.5 * IQR) & (data[col] <= Q3 + 1.5 * IQR)] processed_path = '/tmp/processed_data.csv' data.to_csv(processed_path, index=False) task_instance.xcom_push(key='processed_path', value=processed_path) logger.info(f"Data processed: {len(data)} rows after cleaning") return {'status': 'success', 'rows_remaining': len(data)} # Feature engineering def engineer_features(**context): """Create new features""" logger.info("Starting feature engineering...") task_instance = context['task_instance'] processed_path = task_instance.xcom_pull(key='processed_path', task_ids='process_data') data = pd.read_csv(processed_path) # Create interaction features feature_cols = [col for col in data.columns if col.startswith('feature_')] for i in range(min(5, len(feature_cols))): for j in range(i+1, min(6, len(feature_cols))): data[f'interaction_{i}_{j}'] = data[feature_cols[i]] * data[feature_cols[j]] # Create polynomial features for col in feature_cols[:5]: data[f'{col}_squared'] = data[col] ** 2 engineered_path = '/tmp/engineered_data.csv' data.to_csv(engineered_path, index=False) task_instance.xcom_push(key='engineered_path', value=engineered_path) logger.info(f"Features engineered: {len(data.columns)} total features") return {'status': 'success', 'features': len(data.columns)} # Train model def train_model(**context): """Train ML model""" logger.info("Starting model training...") task_instance = context['task_instance'] engineered_path = task_instance.xcom_pull(key='engineered_path', task_ids='engineer_features') data = pd.read_csv(engineered_path) X = data.drop('target', axis=1) y = data['target'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Scale features scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # Train model model = RandomForestClassifier(n_estimators=100, max_depth=15, random_state=42) model.fit(X_train_scaled, y_train) # Evaluate y_pred = model.predict(X_test_scaled) accuracy = accuracy_score(y_test, y_pred) f1 = f1_score(y_test, y_pred) # Save model model_path = '/tmp/model.pkl' scaler_path = '/tmp/scaler.pkl' joblib.dump(model, model_path) joblib.dump(scaler, scaler_path) task_instance.xcom_push(key='model_path', value=model_path) task_instance.xcom_push(key='scaler_path', value=scaler_path) # Log to MLflow with mlflow.start_run(): mlflow.log_param('n_estimators', 100) mlflow.log_param('max_depth', 15) mlflow.log_metric('accuracy', accuracy) mlflow.log_metric('f1_score', f1) mlflow.sklearn.log_model(model, 'model') logger.info(f"Model trained: Accuracy={accuracy:.4f}, F1={f1:.4f}") return {'status': 'success', 'accuracy': accuracy, 'f1_score': f1} # Validate model def validate_model(**context): """Validate model performance""" logger.info("Starting model validation...") task_instance = context['task_instance'] model_path = task_instance.xcom_pull(key='model_path', task_ids='train_model') engineered_path = task_instance.xcom_pull(key='engineered_path', task_ids='engineer_features') model = joblib.load(model_path) data = pd.read_csv(engineered_path) X = data.drop('target', axis=1) y = data['target'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) scaler_path = task_instance.xcom_pull(key='scaler_path', task_ids='train_model') scaler = joblib.load(scaler_path) X_test_scaled = scaler.transform(X_test) # Validate y_pred = model.predict(X_test_scaled) accuracy = accuracy_score(y_test, y_pred) validation_result = { 'status': 'success' if accuracy > 0.85 else 'failed', 'accuracy': accuracy, 'threshold': 0.85, 'timestamp': datetime.now().isoformat() } task_instance.xcom_push(key='validation_result', value=json.dumps(validation_result)) logger.info(f"Validation result: {validation_result}") return validation_result # Deploy model def deploy_model(**context): """Deploy validated model""" logger.info("Starting model deployment...") task_instance = context['task_instance'] validation_result = json.loads(task_instance.xcom_pull( key='validation_result', task_ids='validate_model')) if validation_result['status'] != 'success': logger.warning("Validation failed, deployment skipped") return {'status': 'skipped', 'reason': 'validation_failed'} model_path = task_instance.xcom_pull(key='model_path', task_ids='train_model') scaler_path = task_instance.xcom_pull(key='scaler_path', task_ids='train_model') # Simulate deployment deploy_path = '/tmp/deployed_model/' os.makedirs(deploy_path, exist_ok=True) import shutil shutil.copy(model_path, os.path.join(deploy_path, 'model.pkl')) shutil.copy(scaler_path, os.path.join(deploy_path, 'scaler.pkl')) logger.info(f"Model deployed to {deploy_path}") return {'status': 'success', 'deploy_path': deploy_path} # 2. Airflow DAG Definition print("\n=== 2. Airflow DAG ===") dag_definition = ''' from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = { 'owner': 'ml-team', 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG( 'ml_pipeline_dag', default_args=default_args, description='End-to-end ML pipeline', schedule_interval='0 2 * * *', # Daily at 2 AM start_date=datetime(2024, 1, 1), catchup=False, ) as dag: # Task 1: Ingest Data ingest = PythonOperator( task_id='ingest_data', python_callable=ingest_data, ) # Task 2: Process Data process = PythonOperator( task_id='process_data', python_callable=process_data, ) # Task 3: Engineer Features engineer = PythonOperator( task_id='engineer_features', python_callable=engineer_features, ) # Task 4: Train Model train = PythonOperator( task_id='train_model', python_callable=train_model, ) # Task 5: Validate Model validate = PythonOperator( task_id='validate_model', python_callable=validate_model, ) # Task 6: Deploy Model deploy = PythonOperator( task_id='deploy_model', python_callable=deploy_model, ) # Define dependencies ingest >> process >> engineer >> train >> validate >> deploy ''' print("Airflow DAG defined with 6 tasks") # 3. Pipeline execution summary print("\n=== 3. Pipeline Execution ===") class PipelineOrchestrator: def __init__(self): self.execution_log = [] self.start_time = None self.end_time = None def run_pipeline(self): self.start_time = datetime.now() logger.info("Starting ML pipeline execution") try: # Execute pipeline tasks result1 = ingest_data(task_instance=self) self.execution_log.append(('ingest_data', result1)) result2 = process_data(task_instance=self) self.execution_log.append(('process_data', result2)) result3 = engineer_features(task_instance=self) self.execution_log.append(('engineer_features', result3)) result4 = train_model(task_instance=self) self.execution_log.append(('train_model', result4)) result5 = validate_model(task_instance=self) self.execution_log.append(('validate_model', result5)) result6 = deploy_model(task_instance=self) self.execution_log.append(('deploy_model', result6)) self.end_time = datetime.now() logger.info("Pipeline execution completed successfully") except Exception as e: logger.error(f"Pipeline execution failed: {str(e)}") def xcom_push(self, key, value): if not hasattr(self, 'xcom_storage'): self.xcom_storage = {} self.xcom_storage[key] = value def xcom_pull(self, key, task_ids): if hasattr(self, 'xcom_storage') and key in self.xcom_storage: return self.xcom_storage[key] return None def get_summary(self): duration = (self.end_time - self.start_time).total_seconds() if self.end_time else 0 return { 'start_time': self.start_time.isoformat() if self.start_time else None, 'end_time': self.end_time.isoformat() if self.end_time else None, 'duration_seconds': duration, 'tasks_executed': len(self.execution_log), 'execution_log': self.execution_log } # Execute pipeline orchestrator = PipelineOrchestrator() orchestrator.run_pipeline() summary = orchestrator.get_summary() print("\n=== Pipeline Summary ===") for key, value in summary.items(): if key != 'execution_log': print(f"{key}: {value}") print("\nTask Execution Log:") for task_name, result in summary['execution_log']: print(f" {task_name}: {result}") print("\nML pipeline automation setup completed!") ``` ## Pipeline Best Practices - **Modularity**: Each step should be independent - **Idempotency**: Tasks should be safely repeatable - **Error Handling**: Graceful degradation and alerting - **Versioning**: Track data, code, and model versions - **Monitoring**: Track execution metrics and logs ## Scheduling Strategies - **Daily**: Standard for daily retraining - **Weekly**: For larger feature engineering - **On-demand**: Triggered by data updates - **Real-time**: For streaming applications ## Deliverables - Automated pipeline DAG - Task dependency graph - Execution logs and monitoring - Performance metrics - Rollback procedures - Documentation