From Prototype to Production
Building an AI prototype that works in demos is very different from deploying a reliable, scalable, cost-effective production system. Production AI applications must handle concurrent users, manage costs, recover from failures, maintain quality, and comply with regulations. This lesson covers the engineering practices that bridge the gap.
Production Readiness Checklist
- Reliability: Error handling, retries, fallbacks, circuit breakers
- Scalability: Handle 10x traffic spikes without degradation
- Cost Control: Caching, model routing, token budgets
- Observability: Logging, monitoring, alerting, tracing
- Security: Input validation, output filtering, authentication, rate limiting
- Quality: Automated evaluation, regression testing, human review
Error Handling and Resilience
// Production-grade LLM client with resilience
import Anthropic from "@anthropic-ai/sdk";
interface LLMConfig {
primaryModel: string;
fallbackModel: string;
maxRetries: number;
timeoutMs: number;
maxTokenBudget: number;
}
class ProductionLLMClient {
private anthropic: Anthropic;
private config: LLMConfig;
private circuitBreaker = { failures: 0, lastFailure: 0, isOpen: false };
constructor(config: LLMConfig) {
this.anthropic = new Anthropic();
this.config = config;
}
async call(
messages: Anthropic.MessageParam[],
options?: { system?: string; maxTokens?: number }
): Promise<string> {
// Circuit breaker check
if (this.circuitBreaker.isOpen) {
const elapsed = Date.now() - this.circuitBreaker.lastFailure;
if (elapsed < 60000) { // 1 minute cooldown
return this.fallback(messages, options);
}
this.circuitBreaker.isOpen = false;
}
// Retry with exponential backoff
for (let attempt = 0; attempt < this.config.maxRetries; attempt++) {
try {
const response = await Promise.race([
this.anthropic.messages.create({
model: this.config.primaryModel,
max_tokens: options?.maxTokens || 1024,
...(options?.system ? { system: options.system } : {}),
messages,
}),
this.timeout(this.config.timeoutMs),
]) as Anthropic.Message;
// Reset circuit breaker on success
this.circuitBreaker.failures = 0;
return response.content[0].type === "text"
? response.content[0].text : "";
} catch (error: any) {
const isRetryable = error.status === 429 || error.status >= 500;
if (!isRetryable || attempt === this.config.maxRetries - 1) {
this.circuitBreaker.failures++;
if (this.circuitBreaker.failures >= 5) {
this.circuitBreaker.isOpen = true;
this.circuitBreaker.lastFailure = Date.now();
}
// Fall back to secondary model
return this.fallback(messages, options);
}
// Exponential backoff: 1s, 2s, 4s, 8s...
const delay = Math.min(1000 * Math.pow(2, attempt), 30000);
await new Promise(r => setTimeout(r, delay));
}
}
return this.fallback(messages, options);
}
private async fallback(
messages: Anthropic.MessageParam[],
options?: { system?: string; maxTokens?: number }
): Promise<string> {
try {
const response = await this.anthropic.messages.create({
model: this.config.fallbackModel,
max_tokens: options?.maxTokens || 1024,
...(options?.system ? { system: options.system } : {}),
messages,
});
return response.content[0].type === "text" ? response.content[0].text : "";
} catch {
return "I'm sorry, our AI service is temporarily unavailable. Please try again shortly.";
}
}
private timeout(ms: number): Promise<never> {
return new Promise((_, reject) =>
setTimeout(() => reject(new Error("Request timed out")), ms)
);
}
}
// Usage
const client = new ProductionLLMClient({
primaryModel: "claude-sonnet-4-20250514",
fallbackModel: "claude-haiku-3.5",
maxRetries: 3,
timeoutMs: 30000,
maxTokenBudget: 100000,
});
Caching Strategy
# Multi-layer caching for LLM applications
import hashlib
import json
import redis
import time
from sentence_transformers import SentenceTransformer
import numpy as np
class LLMCache:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.encoder = SentenceTransformer("all-MiniLM-L6-v2")
def _exact_key(self, prompt: str, model: str) -> str:
"""Generate cache key for exact match."""
content = f"{model}:{prompt}"
return f"llm:exact:{hashlib.sha256(content.encode()).hexdigest()}"
def get_exact(self, prompt: str, model: str) -> str | None:
"""Check for exact prompt match."""
key = self._exact_key(prompt, model)
cached = self.redis.get(key)
return cached.decode() if cached else None
def set_exact(self, prompt: str, model: str, response: str, ttl: int = 3600):
"""Cache exact prompt-response pair."""
key = self._exact_key(prompt, model)
self.redis.setex(key, ttl, response)
def get_semantic(self, prompt: str, threshold: float = 0.95) -> str | None:
"""Check for semantically similar cached prompts."""
query_embedding = self.encoder.encode(prompt)
# Search cached embeddings (in production, use a vector DB)
cached_keys = self.redis.keys("llm:semantic:*")
for key in cached_keys:
data = json.loads(self.redis.get(key))
cached_embedding = np.array(data["embedding"])
similarity = np.dot(query_embedding, cached_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(cached_embedding)
)
if similarity > threshold:
return data["response"]
return None
def set_semantic(self, prompt: str, response: str, ttl: int = 3600):
"""Cache with semantic similarity matching."""
embedding = self.encoder.encode(prompt).tolist()
key = f"llm:semantic:{hashlib.sha256(prompt.encode()).hexdigest()}"
self.redis.setex(key, ttl, json.dumps({
"embedding": embedding,
"response": response,
"prompt": prompt,
}))
# Usage in production
cache = LLMCache()
async def cached_llm_call(prompt: str, model: str) -> str:
# Layer 1: Exact match (fastest, ~1ms)
cached = cache.get_exact(prompt, model)
if cached:
return cached
# Layer 2: Semantic match (~10ms)
cached = cache.get_semantic(prompt)
if cached:
return cached
# Layer 3: Actual LLM call (~1000ms)
response = await llm_call(prompt, model)
# Cache the result
cache.set_exact(prompt, model, response)
cache.set_semantic(prompt, response)
return response
Rate Limiting and Cost Control
// Rate limiting and cost control
class UsageLimiter {
private userUsage = new Map<string, {
tokens: number;
requests: number;
cost: number;
windowStart: number;
}>();
private limits = {
maxTokensPerMinute: 100000,
maxRequestsPerMinute: 60,
maxCostPerDay: 10.0, // $10 per user per day
};
canProceed(userId: string, estimatedTokens: number): {
allowed: boolean;
reason?: string;
retryAfterMs?: number;
} {
const now = Date.now();
const usage = this.userUsage.get(userId);
if (!usage || now - usage.windowStart > 60000) {
this.userUsage.set(userId, {
tokens: 0, requests: 0, cost: 0, windowStart: now,
});
return { allowed: true };
}
if (usage.requests >= this.limits.maxRequestsPerMinute) {
return {
allowed: false,
reason: "Rate limit exceeded",
retryAfterMs: 60000 - (now - usage.windowStart),
};
}
if (usage.tokens + estimatedTokens > this.limits.maxTokensPerMinute) {
return {
allowed: false,
reason: "Token limit exceeded",
retryAfterMs: 60000 - (now - usage.windowStart),
};
}
return { allowed: true };
}
recordUsage(userId: string, tokens: number, cost: number): void {
const usage = this.userUsage.get(userId);
if (usage) {
usage.tokens += tokens;
usage.requests += 1;
usage.cost += cost;
}
}
}
Deployment Architecture
Production Architecture Components
| Layer | Component | Purpose |
|---|---|---|
| Edge | CDN + API Gateway | Rate limiting, auth, caching |
| Application | App Server (Next.js, FastAPI) | Business logic, orchestration |
| AI Layer | LLM Gateway (LiteLLM) | Model routing, fallback, logging |
| Data | Vector DB + Postgres | Embeddings, state, user data |
| Cache | Redis | Response caching, rate limiting |
| Monitoring | LangSmith / Datadog | Traces, metrics, alerts |
Production Checklist
- Resilience: Retry with backoff, circuit breakers, model fallbacks, graceful degradation
- Caching: Exact match + semantic cache. Can reduce costs by 30-60%.
- Rate Limiting: Per-user token and request limits. Prevent a single user from exhausting your budget.
- Monitoring: Track latency (p50/p95/p99), error rate, token usage, and cost per request.
- Testing: Automated eval suite running on every deployment. Block deploys that regress quality.
- Security: Input validation, output filtering, API key rotation, audit logging.
- Cost: Set daily/monthly budgets with automatic alerts. Use cheaper models for simpler tasks.
- Data Privacy: Understand what data reaches the LLM provider. Implement PII scrubbing if needed.
Scaling Strategies
# Production scaling with queue-based architecture
import asyncio
from dataclasses import dataclass
from collections import deque
@dataclass
class AIRequest:
id: str
prompt: str
priority: int # 1=high, 3=low
callback: callable
class AIRequestQueue:
"""Priority queue for AI requests with concurrency control."""
def __init__(self, max_concurrent: int = 10):
self.queues = {1: deque(), 2: deque(), 3: deque()}
self.semaphore = asyncio.Semaphore(max_concurrent)
self.running = True
async def enqueue(self, request: AIRequest):
self.queues[request.priority].append(request)
async def process_loop(self):
while self.running:
request = self._next_request()
if request:
async with self.semaphore:
try:
result = await process_ai_request(request)
request.callback(result)
except Exception as e:
request.callback({"error": str(e)})
else:
await asyncio.sleep(0.1)
def _next_request(self) -> AIRequest | None:
for priority in [1, 2, 3]:
if self.queues[priority]:
return self.queues[priority].popleft()
return None
# Usage: process high-priority requests first
queue = AIRequestQueue(max_concurrent=20)
asyncio.create_task(queue.process_loop())
Summary
Going to production with AI applications requires engineering discipline beyond the AI itself. Build resilience with retries and fallbacks, control costs with caching and model routing, ensure quality with automated evaluation, and monitor everything. The gap between a working demo and a reliable production system is significant, but the patterns in this lesson — error handling, caching, rate limiting, observability, and testing — will get you there. Start with the basics and add sophistication as you scale.