--- name: Model Deployment description: Deploy machine learning models to production using Flask, FastAPI, Docker, cloud platforms (AWS, GCP, Azure), and model serving frameworks --- # Model Deployment ## Overview Model deployment is the process of taking a trained machine learning model and making it available for production use through APIs, web services, or batch processing systems. ## When to Use - When productionizing trained models for real-world inference and predictions - When building REST APIs or web services for model serving - When scaling predictions to serve multiple users or applications - When deploying models to cloud platforms, edge devices, or containers - When implementing CI/CD pipelines for ML model updates - When creating batch processing systems for large-scale predictions ## Deployment Approaches - **REST APIs**: Flask, FastAPI for synchronous inference - **Batch Processing**: Scheduled jobs for large-scale predictions - **Real-time Streaming**: Kafka, Spark Streaming for continuous data - **Serverless**: AWS Lambda, Google Cloud Functions - **Edge Deployment**: TensorFlow Lite, ONNX for edge devices - **Model Serving**: TensorFlow Serving, Seldon Core, BentoML ## Key Considerations - **Model Format**: Pickle, SavedModel, ONNX, PMML - **Scalability**: Load balancing, auto-scaling - **Latency**: Response time requirements - **Monitoring**: Model drift, performance metrics - **Versioning**: Multiple model versions in production ## Python Implementation ```python import numpy as np import pandas as pd import pickle import json from sklearn.ensemble import RandomForestClassifier from sklearn.preprocessing import StandardScaler from sklearn.datasets import make_classification import joblib # FastAPI for REST API from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel, Field import uvicorn # For model serving import mlflow.pyfunc import mlflow.sklearn # Docker and deployment import logging import time from typing import List, Dict # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) print("=== 1. Train and Save Model ===") # Create dataset X, y = make_classification(n_samples=1000, n_features=20, random_state=42) scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # Train model model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42) model.fit(X_scaled, y) # Save model and preprocessing model_path = '/tmp/model.pkl' scaler_path = '/tmp/scaler.pkl' joblib.dump(model, model_path) joblib.dump(scaler, scaler_path) print(f"Model saved to {model_path}") print(f"Scaler saved to {scaler_path}") # 2. Model Serving Class print("\n=== 2. Model Serving Class ===") class ModelPredictor: def __init__(self, model_path, scaler_path): self.model = joblib.load(model_path) self.scaler = joblib.load(scaler_path) self.load_time = time.time() self.predictions_count = 0 logger.info("Model loaded successfully") def predict(self, features: List[List[float]]) -> Dict: try: X = np.array(features) X_scaled = self.scaler.transform(X) predictions = self.model.predict(X_scaled) probabilities = self.model.predict_proba(X_scaled) self.predictions_count += len(X) return { 'predictions': predictions.tolist(), 'probabilities': probabilities.tolist(), 'count': len(X), 'timestamp': time.time() } except Exception as e: logger.error(f"Prediction error: {str(e)}") raise def health_check(self) -> Dict: return { 'status': 'healthy', 'uptime': time.time() - self.load_time, 'predictions': self.predictions_count } # Initialize predictor predictor = ModelPredictor(model_path, scaler_path) # 3. FastAPI Application print("\n=== 3. FastAPI Application ===") app = FastAPI( title="ML Model API", description="Production ML model serving API", version="1.0.0" ) class PredictionRequest(BaseModel): features: List[List[float]] = Field(..., example=[[1.0, 2.0, 3.0]]) class PredictionResponse(BaseModel): predictions: List[int] probabilities: List[List[float]] count: int timestamp: float class HealthResponse(BaseModel): status: str uptime: float predictions: int @app.get("/health", response_model=HealthResponse) async def health_check(): """Health check endpoint""" return predictor.health_check() @app.post("/predict", response_model=PredictionResponse) async def predict(request: PredictionRequest): """Make predictions""" try: result = predictor.predict(request.features) return result except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.post("/predict-batch") async def predict_batch(requests: List[PredictionRequest], background_tasks: BackgroundTasks): """Batch prediction with background processing""" all_features = [] for req in requests: all_features.extend(req.features) result = predictor.predict(all_features) background_tasks.add_task(logger.info, f"Batch prediction processed: {result['count']} samples") return result @app.get("/stats") async def get_stats(): """Get model statistics""" return { 'model_type': type(predictor.model).__name__, 'n_estimators': predictor.model.n_estimators, 'max_depth': predictor.model.max_depth, 'feature_importance': predictor.model.feature_importances_.tolist(), 'total_predictions': predictor.predictions_count } # 4. Dockerfile template print("\n=== 4. Dockerfile Template ===") dockerfile_content = '''FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY model.pkl . COPY scaler.pkl . COPY app.py . EXPOSE 8000 CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] ''' print("Dockerfile content:") print(dockerfile_content) # 5. Requirements file print("\n=== 5. Requirements.txt ===") requirements = """fastapi==0.104.1 uvicorn[standard]==0.24.0 numpy==1.24.0 pandas==2.1.0 scikit-learn==1.3.2 joblib==1.3.2 pydantic==2.5.0 mlflow==2.8.1 """ print("Requirements:") print(requirements) # 6. Docker Compose for deployment print("\n=== 6. Docker Compose Template ===") docker_compose = '''version: '3.8' services: ml-api: build: . ports: - "8000:8000" environment: - LOG_LEVEL=info - WORKERS=4 restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 10s timeout: 5s retries: 3 ml-monitor: image: prom/prometheus:latest ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml command: - "--config.file=/etc/prometheus/prometheus.yml" ml-dashboard: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin volumes: - ./grafana/dashboards:/etc/grafana/provisioning/dashboards ''' print("Docker Compose content:") print(docker_compose) # 7. Testing the API print("\n=== 7. Testing the API ===") def test_predictor(): # Test single prediction test_features = [[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1]] result = predictor.predict(test_features) print(f"Prediction result: {result}") # Health check health = predictor.health_check() print(f"Health status: {health}") # Batch predictions batch_features = [ [1.0] * 20, [2.0] * 20, [3.0] * 20, ] batch_result = predictor.predict(batch_features) print(f"Batch prediction: {batch_result['count']} samples processed") test_predictor() # 8. Model versioning and registry print("\n=== 8. Model Registry with MLflow ===") # Log model to MLflow with mlflow.start_run(): mlflow.sklearn.log_model(model, "model") mlflow.log_param("max_depth", 10) mlflow.log_param("n_estimators", 100) mlflow.log_metric("accuracy", 0.95) model_uri = "runs:/" + mlflow.active_run().info.run_id + "/model" print(f"Model logged to MLflow: {model_uri}") # 9. Deployment monitoring code print("\n=== 9. Monitoring Setup ===") class ModelMonitor: def __init__(self): self.predictions = [] self.latencies = [] def log_prediction(self, features, prediction, latency): self.predictions.append({ 'timestamp': time.time(), 'features_mean': np.mean(features), 'prediction': prediction, 'latency_ms': latency * 1000 }) def check_model_drift(self): if len(self.predictions) < 100: return {'drift_detected': False} recent_predictions = [p['prediction'] for p in self.predictions[-100:]] historical_mean = np.mean([p['prediction'] for p in self.predictions[:-100]]) recent_mean = np.mean(recent_predictions) drift = abs(recent_mean - historical_mean) > 0.1 return { 'drift_detected': drift, 'historical_mean': float(historical_mean), 'recent_mean': float(recent_mean), 'threshold': 0.1 } def get_stats(self): if not self.latencies: return {} return { 'avg_latency_ms': np.mean(self.latencies) * 1000, 'p95_latency_ms': np.percentile(self.latencies, 95) * 1000, 'p99_latency_ms': np.percentile(self.latencies, 99) * 1000, 'total_predictions': len(self.predictions) } monitor = ModelMonitor() print("\nDeployment setup completed!") print("To run FastAPI server: uvicorn app:app --reload") ``` ## Deployment Checklist - Model format and serialization - Input/output validation - Error handling and logging - Authentication and security - Rate limiting and throttling - Health check endpoints - Monitoring and alerting - Version management - Rollback procedures ## Cloud Deployment Options - **AWS**: SageMaker, Lambda, EC2 - **GCP**: Vertex AI, Cloud Run, App Engine - **Azure**: Machine Learning, App Service - **Kubernetes**: Self-managed on-premises ## Performance Optimization - Model quantization for smaller size - Caching predictions - Batch processing - GPU acceleration - Request pooling ## Deliverables - Deployed model endpoint - API documentation - Docker configuration - Monitoring dashboard - Deployment guide - Performance benchmarks - Scaling recommendations