Model Serving on AWS
Deploy ML models to production with optimized inference: torch.compile vs ONNX benchmarks, FastAPI serving patterns, and AWS deployment options.
Your model achieves 95% accuracy on the test set. Congratulations. Now deploy it to handle 1,000 requests per second with P95 latency under 50ms. Suddenly accuracy doesn’t matter if users abandon the page before seeing results.
Model serving is where ML engineering meets systems engineering. This tutorial covers optimization techniques with real benchmarks, production serving patterns, and AWS deployment options.
Understanding Latency Requirements
Before optimizing, define your requirements:
| Application | P50 Target | P95 Target | Batch Size |
|---|---|---|---|
| Real-time API | under 50ms | under 100ms | 1 |
| Chatbot | under 100ms | under 200ms | 1 |
| Batch processing | under 1s | under 5s | 32-128 |
| Offline analytics | under 10s | under 30s | 128-512 |
Optimization Techniques
1. torch.compile() (PyTorch 2.0+)
The easiest optimization with the biggest payoff. torch.compile() uses TorchDynamo to trace your model and optimize it for your specific hardware.
import torch
# Original model
model = SentimentClassifier().cuda()
model.eval()
# Compile with reduce-overhead mode (best for inference)
compiled_model = torch.compile(model, mode='reduce-overhead')
# First inference triggers compilation (slow)
# Subsequent inferences are optimized
with torch.no_grad():
output = compiled_model(input_ids, attention_mask)
Compilation modes:
default: Good balance of compile time and speedupreduce-overhead: Best for inference (smaller batches)max-autotune: Slowest compile, fastest inference
2. ONNX Runtime
Export your model to ONNX format and use ONNX Runtime for inference. Works across frameworks and often beats native PyTorch.
import torch
import onnxruntime as ort
# Export to ONNX
torch.onnx.export(
model,
(input_ids, attention_mask),
"model.onnx",
input_names=['input_ids', 'attention_mask'],
output_names=['logits'],
dynamic_axes={
'input_ids': {0: 'batch_size', 1: 'seq_len'},
'attention_mask': {0: 'batch_size', 1: 'seq_len'},
'logits': {0: 'batch_size'}
},
opset_version=18
)
# Load with ONNX Runtime
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
session = ort.InferenceSession("model.onnx", providers=providers)
# Run inference
output = session.run(
None,
{'input_ids': input_ids.numpy(), 'attention_mask': attention_mask.numpy()}
)
3. Half Precision (FP16)
Halve memory usage and often improve throughput on GPUs with Tensor Cores.
# Convert model to FP16
model_fp16 = model.half()
# Inference with FP16 inputs
with torch.no_grad():
output = model_fp16(input_ids.half(), attention_mask.half())
Real Benchmark Results
I ran these benchmarks on an NVIDIA L40S GPU (46GB VRAM) with a DistilBERT-based sentiment classifier (66.5M parameters).
Latency Comparison
| Runtime | Batch 1 | Batch 8 | Batch 32 |
|---|---|---|---|
| PyTorch FP32 | 2.88ms | 2.77ms | 3.00ms |
| PyTorch FP16 | 2.70ms | 2.77ms | 3.01ms |
| torch.compile() | 0.46ms | 0.90ms | 2.53ms |
| ONNX Runtime (GPU) | 0.87ms | 1.24ms | 3.61ms |
| ONNX Runtime (CPU) | 25.39ms | 103.75ms | 357.73ms |
Throughput Comparison (QPS)
| Runtime | Batch 1 | Batch 8 | Batch 32 |
|---|---|---|---|
| PyTorch FP32 | 347 | 360 | 334 |
| PyTorch FP16 | 370 | 360 | 332 |
| torch.compile() | 2,165 | 1,112 | 395 |
| ONNX Runtime (GPU) | 1,150 | 804 | 277 |
| ONNX Runtime (CPU) | 39 | 10 | 3 |
Benchmark Code
Here’s the complete benchmark script I used:
#!/usr/bin/env python3
"""Model Serving Benchmark - Compare optimization techniques."""
import time
import statistics
import torch
import torch.nn as nn
from transformers import DistilBertModel, DistilBertTokenizer
import numpy as np
class SentimentClassifier(nn.Module):
"""DistilBERT-based sentiment classifier."""
def __init__(self):
super().__init__()
self.bert = DistilBertModel.from_pretrained('distilbert-base-uncased')
self.classifier = nn.Sequential(
nn.Linear(768, 256),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(256, 2)
)
def forward(self, input_ids, attention_mask):
outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
pooled = outputs.last_hidden_state[:, 0, :]
return self.classifier(pooled)
def benchmark(model, inputs, num_warmup=10, num_runs=100):
"""Run benchmark with warmup."""
device = next(model.parameters()).device
input_ids = inputs['input_ids'].to(device)
attention_mask = inputs['attention_mask'].to(device)
# Warmup
with torch.no_grad():
for _ in range(num_warmup):
_ = model(input_ids, attention_mask)
if device.type == 'cuda':
torch.cuda.synchronize()
# Measure
latencies = []
with torch.no_grad():
for _ in range(num_runs):
start = time.perf_counter()
_ = model(input_ids, attention_mask)
if device.type == 'cuda':
torch.cuda.synchronize()
latencies.append((time.perf_counter() - start) * 1000)
return {
'mean_ms': statistics.mean(latencies),
'p95_ms': np.percentile(latencies, 95),
'qps': 1000 / statistics.mean(latencies)
}
if __name__ == '__main__':
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
model = SentimentClassifier().to(device).eval()
# Create sample input
inputs = tokenizer(
["This is a test sentence for benchmarking."],
padding='max_length',
truncation=True,
max_length=128,
return_tensors='pt'
)
# Benchmark PyTorch FP32
print("PyTorch FP32:", benchmark(model, inputs))
# Benchmark torch.compile()
compiled = torch.compile(model, mode='reduce-overhead')
# Extra warmup for compilation
with torch.no_grad():
for _ in range(15):
compiled(inputs['input_ids'].to(device), inputs['attention_mask'].to(device))
print("torch.compile():", benchmark(compiled, inputs))
PyTorch FP32: {'mean_ms': 2.88, 'p95_ms': 2.90, 'qps': 346.9}
torch.compile(): {'mean_ms': 0.46, 'p95_ms': 0.48, 'qps': 2165.0} Production Serving with FastAPI
Here’s a production-ready FastAPI server with health checks, batching, and metrics:
#!/usr/bin/env python3
"""FastAPI Model Server with torch.compile optimization."""
import time
from typing import List, Optional
from contextlib import asynccontextmanager
import logging
import torch
import torch.nn as nn
from transformers import DistilBertModel, DistilBertTokenizer
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import uvicorn
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Model definition (same as before)
class SentimentClassifier(nn.Module):
def __init__(self):
super().__init__()
self.bert = DistilBertModel.from_pretrained('distilbert-base-uncased')
self.classifier = nn.Sequential(
nn.Linear(768, 256),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(256, 2)
)
def forward(self, input_ids, attention_mask):
outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
pooled = outputs.last_hidden_state[:, 0, :]
return self.classifier(pooled)
# Request/Response models
class PredictionRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=512)
class BatchRequest(BaseModel):
texts: List[str] = Field(..., min_items=1, max_items=32)
class PredictionResponse(BaseModel):
text: str
sentiment: str
confidence: float
latency_ms: float
class HealthResponse(BaseModel):
status: str
device: str
gpu_memory_gb: Optional[float] = None
requests_processed: int
# Model manager
class ModelManager:
def __init__(self):
self.model = None
self.tokenizer = None
self.device = None
self.requests_processed = 0
def load(self):
logger.info("Loading model...")
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
self.model = SentimentClassifier().to(self.device).eval()
# Apply torch.compile
if self.device.type == 'cuda':
logger.info("Compiling with torch.compile()...")
self.model = torch.compile(self.model, mode='reduce-overhead')
# Warmup
dummy = self.tokenizer("warmup", return_tensors='pt', padding='max_length',
truncation=True, max_length=128)
with torch.no_grad():
for _ in range(10):
self.model(dummy['input_ids'].to(self.device),
dummy['attention_mask'].to(self.device))
logger.info(f"Model loaded on {self.device}")
def predict(self, texts: List[str]) -> List[dict]:
start = time.perf_counter()
inputs = self.tokenizer(texts, return_tensors='pt', padding=True,
truncation=True, max_length=128)
input_ids = inputs['input_ids'].to(self.device)
attention_mask = inputs['attention_mask'].to(self.device)
with torch.no_grad():
logits = self.model(input_ids, attention_mask)
if self.device.type == 'cuda':
torch.cuda.synchronize()
probs = torch.softmax(logits, dim=-1)
preds = torch.argmax(probs, dim=-1)
confs = probs.max(dim=-1).values
latency = (time.perf_counter() - start) * 1000
labels = ['negative', 'positive']
results = []
for i, text in enumerate(texts):
results.append({
'text': text[:100] + '...' if len(text) > 100 else text,
'sentiment': labels[preds[i].item()],
'confidence': round(confs[i].item(), 4),
'latency_ms': round(latency / len(texts), 2)
})
self.requests_processed += len(texts)
return results
manager = ModelManager()
@asynccontextmanager
async def lifespan(app: FastAPI):
manager.load()
yield
app = FastAPI(title="Sentiment API", lifespan=lifespan)
@app.get("/health", response_model=HealthResponse)
async def health():
gpu_mem = None
if manager.device and manager.device.type == 'cuda':
gpu_mem = round(torch.cuda.memory_allocated() / 1e9, 2)
return HealthResponse(
status="healthy",
device=str(manager.device),
gpu_memory_gb=gpu_mem,
requests_processed=manager.requests_processed
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(req: PredictionRequest):
results = manager.predict([req.text])
return results[0]
@app.post("/predict/batch")
async def predict_batch(req: BatchRequest):
return {"predictions": manager.predict(req.texts)}
if __name__ == "__main__":
uvicorn.run("server:app", host="0.0.0.0", port=8000, workers=1)
Testing the Server
import httpx
# Health check
resp = httpx.get("http://localhost:8000/health")
print(resp.json())
# {'status': 'healthy', 'device': 'cuda', 'gpu_memory_gb': 0.25, 'requests_processed': 0}
# Single prediction
resp = httpx.post(
"http://localhost:8000/predict",
json={"text": "This movie was fantastic!"}
)
print(resp.json())
# {'text': 'This movie was fantastic!', 'sentiment': 'positive', 'confidence': 0.92, 'latency_ms': 1.2}
# Batch prediction
resp = httpx.post(
"http://localhost:8000/predict/batch",
json={"texts": ["Great product!", "Terrible service.", "It's okay."]}
)
print(resp.json())
AWS Deployment Options
Option 1: Amazon SageMaker (Managed)
Best for: Teams without DevOps resources, auto-scaling requirements.
import sagemaker
from sagemaker.pytorch import PyTorchModel
model = PyTorchModel(
model_data='s3://my-bucket/model.tar.gz',
role='arn:aws:iam::123456789:role/SageMakerRole',
framework_version='2.0.0',
py_version='py310',
entry_point='inference.py'
)
predictor = model.deploy(
initial_instance_count=1,
instance_type='ml.g5.xlarge', # NVIDIA A10G, 24GB VRAM
endpoint_name='sentiment-classifier'
)
Pros:
- Auto-scaling built-in
- A/B testing support
- Managed infrastructure
Cons:
- Higher cost (~$1.00/hr for g5.xlarge)
- Cold start latency (30-60s)
- Less control over runtime
Option 2: AWS Lambda (Serverless)
Best for: Bursty traffic, cost optimization, CPU-only models.
# handler.py
import torch
from transformers import pipeline
# Load model outside handler (reused across invocations)
classifier = pipeline("sentiment-analysis", device=-1) # CPU only
def handler(event, context):
text = event.get('text', '')
result = classifier(text)[0]
return {
'statusCode': 200,
'body': {
'sentiment': result['label'],
'confidence': result['score']
}
}
Pros:
- Pay per request (no idle costs)
- Auto-scaling to thousands of concurrent requests
- No server management
Cons:
- No GPU support
- Cold starts (1-10s)
- Size and timeout limits
Option 3: Amazon ECS/EKS (Containers)
Best for: Full control, custom scaling, multi-model serving.
# Dockerfile
FROM pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY model/ /app/model/
COPY server.py /app/
EXPOSE 8000
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
# task-definition.json
{
"family": "sentiment-classifier",
"requiresCompatibilities": ["EC2"],
"cpu": "4096",
"memory": "30720",
"containerDefinitions": [{
"name": "inference",
"image": "123456789.dkr.ecr.us-east-1.amazonaws.com/sentiment:latest",
"portMappings": [{"containerPort": 8000}],
"resourceRequirements": [{
"type": "GPU",
"value": "1"
}],
"healthCheck": {
"command": ["CMD-SHELL", "curl -f http://localhost:8000/health || exit 1"],
"interval": 30,
"timeout": 5,
"retries": 3
}
}]
}
Pros:
- Full control over environment
- Custom scaling policies
- GPU support with ECS GPU AMI
Cons:
- More operational overhead
- Need to manage cluster
- Manual scaling configuration
Cost Comparison
| Option | Instance | Cost/Hour | Cold Start | Best For |
|---|---|---|---|---|
| SageMaker | ml.g5.xlarge | ~$1.00 | 30-60s | Managed, auto-scale |
| Lambda | N/A | ~$0.0002/req | 1-10s | Bursty, CPU-only |
| ECS (GPU) | g5.xlarge | ~$0.60 | 0s (warm) | Control, multi-model |
| ECS (CPU) | c6i.xlarge | ~$0.17 | 0s (warm) | High volume, CPU |
Production Checklist
Before deploying to production:
Performance
- Benchmark with realistic data (not random tensors)
- Test with expected batch sizes
- Measure P95 and P99 latency, not just mean
- Load test to find breaking point
Reliability
- Health check endpoint that verifies model is loaded
- Graceful shutdown handling
- Request timeout configuration
- Error handling for malformed inputs
Observability
- Latency metrics (per endpoint)
- Throughput metrics (requests/second)
- Error rate tracking
- GPU memory monitoring
Security
- Input validation and sanitization
- Rate limiting
- Authentication if needed
- No sensitive data in logs
What’s Next
You now have optimized models serving predictions in production. But how do you know they’re still performing well?
- ML Monitoring - Detect drift, track performance, trigger retraining
- ML Security - IAM roles, secrets management, VPC configuration
Full Code
All benchmark and serving code is available on GitHub: largo-tutorials/model-serving
Comments
to join the discussion.