MLOps: Deploying ML Models to Production
Learn the ML lifecycle with experiment tracking, model versioning, serving with FastAPI, containerization, and monitoring drift
What is MLOps?
MLOps (Machine Learning Operations) bridges the gap between ML development and production deployment. While data scientists build models in notebooks, MLOps engineers ensure those models run reliably at scale with monitoring, versioning, and automated retraining.
The MLOps Reality:
Only ~10% of ML code in production systems is the actual model. The other 90% is data pipelines, monitoring, serving infrastructure, testing, and automation.
ML Lifecycle
Data Collection & Preparation
ETL pipelines, data validation, feature engineering, data versioning (DVC)
Experimentation & Training
Experiment tracking, hyperparameter tuning, model selection, reproducibility
Model Packaging & Registry
Model serialization, versioning, artifact storage, model cards
Deployment & Serving
REST APIs, batch inference, A/B testing, canary deployments, scaling
Monitoring & Retraining
Data drift detection, model performance monitoring, automated retraining triggers
Experiment Tracking with MLflow
import mlflow
import mlflow.pytorch
import torch
# Start MLflow tracking server: mlflow server --port 5000
# Set tracking URI
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("image-classifier-v2")
# Run an experiment
with mlflow.start_run(run_name="resnet50-adamw"):
# Log hyperparameters
mlflow.log_param("model", "resnet50")
mlflow.log_param("optimizer", "AdamW")
mlflow.log_param("learning_rate", 0.001)
mlflow.log_param("batch_size", 64)
mlflow.log_param("epochs", 20)
# Training loop (simplified)
for epoch in range(20):
train_loss = 0.5 * (0.95 ** epoch) # simulated
val_accuracy = min(0.95, 0.6 + epoch * 0.02)
# Log metrics per epoch
mlflow.log_metric("train_loss", train_loss, step=epoch)
mlflow.log_metric("val_accuracy", val_accuracy, step=epoch)
# Log the trained model
# mlflow.pytorch.log_model(model, "model")
# Log artifacts (plots, configs, etc.)
# mlflow.log_artifact("confusion_matrix.png")
# Log final metrics
mlflow.log_metric("final_accuracy", 0.94)
mlflow.log_metric("final_loss", 0.18)
print("Experiment tracked! View at http://localhost:5000")
# Compare runs, view metrics, download models from the UI
Serving Models with FastAPI
from fastapi import FastAPI, File, UploadFile
from pydantic import BaseModel
import torch
import torchvision.transforms as transforms
from PIL import Image
import io
app = FastAPI(title="Image Classification API")
# Load model at startup
model = None
@app.on_event("startup")
async def load_model():
global model
model = torch.load("model.pt", map_location="cpu")
model.eval()
print("Model loaded successfully!")
# Preprocessing pipeline
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
class PredictionResponse(BaseModel):
label: str
confidence: float
class_id: int
@app.post("/predict", response_model=PredictionResponse)
async def predict(file: UploadFile = File(...)):
"""Classify an uploaded image."""
# Read and preprocess image
image_data = await file.read()
image = Image.open(io.BytesIO(image_data)).convert("RGB")
tensor = transform(image).unsqueeze(0) # add batch dim
# Run inference
with torch.no_grad():
outputs = model(tensor)
probabilities = torch.softmax(outputs, dim=1)
confidence, class_id = probabilities.max(dim=1)
labels = ["cat", "dog", "bird", "fish", "other"] # your classes
return PredictionResponse(
label=labels[class_id.item()],
confidence=confidence.item(),
class_id=class_id.item()
)
@app.get("/health")
async def health():
return {"status": "healthy", "model_loaded": model is not None}
# Run: uvicorn app:app --host 0.0.0.0 --port 8000
# Test: curl -X POST -F "file=@cat.jpg" http://localhost:8000/predict
Containerizing ML Models
# Dockerfile for ML model serving
# ---- Dockerfile ----
# FROM python:3.11-slim
#
# WORKDIR /app
#
# # Install dependencies
# COPY requirements.txt .
# RUN pip install --no-cache-dir -r requirements.txt
#
# # Copy model and application
# COPY model.pt .
# COPY app.py .
#
# # Expose port
# EXPOSE 8000
#
# # Health check
# HEALTHCHECK --interval=30s --timeout=10s \
# CMD curl -f http://localhost:8000/health || exit 1
#
# # Run the server
# CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml for full ML serving stack
# version: "3.8"
# services:
# ml-api:
# build: .
# ports:
# - "8000:8000"
# volumes:
# - ./models:/app/models # mount model directory
# environment:
# - MODEL_PATH=/app/models/latest.pt
# deploy:
# resources:
# limits:
# memory: 4G
# reservations:
# devices:
# - driver: nvidia
# count: 1
# capabilities: [gpu]
#
# prometheus:
# image: prom/prometheus
# ports:
# - "9090:9090"
# volumes:
# - ./prometheus.yml:/etc/prometheus/prometheus.yml
print("Build: docker build -t ml-api .")
print("Run: docker run -p 8000:8000 ml-api")
print("Scale: docker-compose up --scale ml-api=3")
Monitoring Model Drift
import numpy as np
from scipy import stats
class DriftDetector:
"""Monitor for data and concept drift in production models."""
def __init__(self, reference_data, threshold=0.05):
self.reference = reference_data
self.threshold = threshold
def detect_data_drift(self, new_data, feature_names=None):
"""Kolmogorov-Smirnov test for distribution shift."""
results = {}
for i in range(new_data.shape[1]):
stat, p_value = stats.ks_2samp(
self.reference[:, i], new_data[:, i]
)
name = feature_names[i] if feature_names else f"feature_{i}"
results[name] = {
"statistic": stat,
"p_value": p_value,
"drift_detected": p_value < self.threshold
}
return results
def detect_prediction_drift(self, reference_preds, new_preds):
"""Check if model predictions have shifted."""
stat, p_value = stats.ks_2samp(reference_preds, new_preds)
return {
"statistic": stat,
"p_value": p_value,
"drift_detected": p_value < self.threshold
}
# Usage
np.random.seed(42)
reference = np.random.randn(1000, 5)
new_data = np.random.randn(500, 5) + 0.5 # shifted data!
detector = DriftDetector(reference)
drift = detector.detect_data_drift(
new_data,
feature_names=["age", "income", "score", "clicks", "time"]
)
for feature, result in drift.items():
status = "DRIFT!" if result["drift_detected"] else "OK"
print(f" {feature:10s}: p={result['p_value']:.4f} [{status}]")
Key Takeaways
- MLOps is 90% infrastructure and 10% model code in production systems
- Always track experiments (MLflow, W&B) so you can reproduce any result
- FastAPI is the standard for serving ML models as REST APIs
- Containerize models with Docker for reproducible, scalable deployments
- Monitor for data drift in production; models degrade as data distributions change