TechLead
Intermediate
20 min
Full Guide

MLOps: Deploying ML Models to Production

Learn the ML lifecycle with experiment tracking, model versioning, serving with FastAPI, containerization, and monitoring drift

What is MLOps?

MLOps (Machine Learning Operations) bridges the gap between ML development and production deployment. While data scientists build models in notebooks, MLOps engineers ensure those models run reliably at scale with monitoring, versioning, and automated retraining.

The MLOps Reality:

Only ~10% of ML code in production systems is the actual model. The other 90% is data pipelines, monitoring, serving infrastructure, testing, and automation.

ML Lifecycle

1

Data Collection & Preparation

ETL pipelines, data validation, feature engineering, data versioning (DVC)

2

Experimentation & Training

Experiment tracking, hyperparameter tuning, model selection, reproducibility

3

Model Packaging & Registry

Model serialization, versioning, artifact storage, model cards

4

Deployment & Serving

REST APIs, batch inference, A/B testing, canary deployments, scaling

5

Monitoring & Retraining

Data drift detection, model performance monitoring, automated retraining triggers

Experiment Tracking with MLflow

import mlflow
import mlflow.pytorch
import torch

# Start MLflow tracking server: mlflow server --port 5000

# Set tracking URI
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("image-classifier-v2")

# Run an experiment
with mlflow.start_run(run_name="resnet50-adamw"):
    # Log hyperparameters
    mlflow.log_param("model", "resnet50")
    mlflow.log_param("optimizer", "AdamW")
    mlflow.log_param("learning_rate", 0.001)
    mlflow.log_param("batch_size", 64)
    mlflow.log_param("epochs", 20)

    # Training loop (simplified)
    for epoch in range(20):
        train_loss = 0.5 * (0.95 ** epoch)  # simulated
        val_accuracy = min(0.95, 0.6 + epoch * 0.02)

        # Log metrics per epoch
        mlflow.log_metric("train_loss", train_loss, step=epoch)
        mlflow.log_metric("val_accuracy", val_accuracy, step=epoch)

    # Log the trained model
    # mlflow.pytorch.log_model(model, "model")

    # Log artifacts (plots, configs, etc.)
    # mlflow.log_artifact("confusion_matrix.png")

    # Log final metrics
    mlflow.log_metric("final_accuracy", 0.94)
    mlflow.log_metric("final_loss", 0.18)

print("Experiment tracked! View at http://localhost:5000")
# Compare runs, view metrics, download models from the UI

Serving Models with FastAPI

from fastapi import FastAPI, File, UploadFile
from pydantic import BaseModel
import torch
import torchvision.transforms as transforms
from PIL import Image
import io

app = FastAPI(title="Image Classification API")

# Load model at startup
model = None

@app.on_event("startup")
async def load_model():
    global model
    model = torch.load("model.pt", map_location="cpu")
    model.eval()
    print("Model loaded successfully!")

# Preprocessing pipeline
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

class PredictionResponse(BaseModel):
    label: str
    confidence: float
    class_id: int

@app.post("/predict", response_model=PredictionResponse)
async def predict(file: UploadFile = File(...)):
    """Classify an uploaded image."""
    # Read and preprocess image
    image_data = await file.read()
    image = Image.open(io.BytesIO(image_data)).convert("RGB")
    tensor = transform(image).unsqueeze(0)  # add batch dim

    # Run inference
    with torch.no_grad():
        outputs = model(tensor)
        probabilities = torch.softmax(outputs, dim=1)
        confidence, class_id = probabilities.max(dim=1)

    labels = ["cat", "dog", "bird", "fish", "other"]  # your classes
    return PredictionResponse(
        label=labels[class_id.item()],
        confidence=confidence.item(),
        class_id=class_id.item()
    )

@app.get("/health")
async def health():
    return {"status": "healthy", "model_loaded": model is not None}

# Run: uvicorn app:app --host 0.0.0.0 --port 8000
# Test: curl -X POST -F "file=@cat.jpg" http://localhost:8000/predict

Containerizing ML Models

# Dockerfile for ML model serving
# ---- Dockerfile ----
# FROM python:3.11-slim
#
# WORKDIR /app
#
# # Install dependencies
# COPY requirements.txt .
# RUN pip install --no-cache-dir -r requirements.txt
#
# # Copy model and application
# COPY model.pt .
# COPY app.py .
#
# # Expose port
# EXPOSE 8000
#
# # Health check
# HEALTHCHECK --interval=30s --timeout=10s \
#   CMD curl -f http://localhost:8000/health || exit 1
#
# # Run the server
# CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

# docker-compose.yml for full ML serving stack
# version: "3.8"
# services:
#   ml-api:
#     build: .
#     ports:
#       - "8000:8000"
#     volumes:
#       - ./models:/app/models  # mount model directory
#     environment:
#       - MODEL_PATH=/app/models/latest.pt
#     deploy:
#       resources:
#         limits:
#           memory: 4G
#         reservations:
#           devices:
#             - driver: nvidia
#               count: 1
#               capabilities: [gpu]
#
#   prometheus:
#     image: prom/prometheus
#     ports:
#       - "9090:9090"
#     volumes:
#       - ./prometheus.yml:/etc/prometheus/prometheus.yml

print("Build: docker build -t ml-api .")
print("Run:   docker run -p 8000:8000 ml-api")
print("Scale: docker-compose up --scale ml-api=3")

Monitoring Model Drift

import numpy as np
from scipy import stats

class DriftDetector:
    """Monitor for data and concept drift in production models."""

    def __init__(self, reference_data, threshold=0.05):
        self.reference = reference_data
        self.threshold = threshold

    def detect_data_drift(self, new_data, feature_names=None):
        """Kolmogorov-Smirnov test for distribution shift."""
        results = {}
        for i in range(new_data.shape[1]):
            stat, p_value = stats.ks_2samp(
                self.reference[:, i], new_data[:, i]
            )
            name = feature_names[i] if feature_names else f"feature_{i}"
            results[name] = {
                "statistic": stat,
                "p_value": p_value,
                "drift_detected": p_value < self.threshold
            }
        return results

    def detect_prediction_drift(self, reference_preds, new_preds):
        """Check if model predictions have shifted."""
        stat, p_value = stats.ks_2samp(reference_preds, new_preds)
        return {
            "statistic": stat,
            "p_value": p_value,
            "drift_detected": p_value < self.threshold
        }

# Usage
np.random.seed(42)
reference = np.random.randn(1000, 5)
new_data = np.random.randn(500, 5) + 0.5  # shifted data!

detector = DriftDetector(reference)
drift = detector.detect_data_drift(
    new_data,
    feature_names=["age", "income", "score", "clicks", "time"]
)
for feature, result in drift.items():
    status = "DRIFT!" if result["drift_detected"] else "OK"
    print(f"  {feature:10s}: p={result['p_value']:.4f} [{status}]")

Key Takeaways

  • MLOps is 90% infrastructure and 10% model code in production systems
  • Always track experiments (MLflow, W&B) so you can reproduce any result
  • FastAPI is the standard for serving ML models as REST APIs
  • Containerize models with Docker for reproducible, scalable deployments
  • Monitor for data drift in production; models degrade as data distributions change

Continue Learning