Experiment Tracking with MLflow and Langfuse
Set up experiment tracking for ML models with MLflow and LLM observability with Langfuse. Includes hyperparameter sweeps, model registry, and cost tracking.
Every ML engineer has been there: you run an experiment, get great results, then can’t remember what hyperparameters you used. Or worse—you can’t reproduce the results because you didn’t track the code version, data version, or random seed.
Experiment tracking solves this by automatically logging everything: parameters, metrics, artifacts, and code. This tutorial covers two essential tools:
- MLflow - The standard for tracking traditional ML experiments
- Langfuse - Purpose-built observability for LLM applications
By the end, you’ll have a complete tracking setup that works for both classical ML and LLM-powered systems.
Why Track Experiments?
Without tracking, you’re flying blind:
| Problem | Impact |
|---|---|
| Can’t reproduce results | Wasted time re-running experiments |
| Lost hyperparameters | Can’t explain why one model is better |
| No comparison | Hard to know if changes helped |
| Missing artifacts | Models get lost or overwritten |
| Unknown costs | LLM bills surprise you at month end |
Proper tracking gives you:
- Reproducibility - Every experiment can be recreated
- Comparison - Side-by-side metric comparison
- Lineage - Know exactly what produced each model
- Collaboration - Share results with your team
- Cost visibility - Track token usage and API costs
Part 1: MLflow for ML Experiments
MLflow is the industry standard for experiment tracking. It’s open source, works with any ML framework, and integrates with major cloud platforms.
Installation and Setup
pip install mlflow torch transformers datasets scikit-learn
MLflow supports multiple backend stores:
| Backend | Use Case | Setup |
|---|---|---|
| File system | Local development | file:///path/to/mlruns |
| SQLite | Single-user, persistent | sqlite:///mlflow.db |
| PostgreSQL | Team, production | postgresql://user:pass@host/db |
| MLflow Cloud | Managed service | Databricks integration |
For this tutorial, we’ll start with file-based tracking, then show database setup.
Basic Experiment Tracking
Here’s a complete example that trains a sentiment classifier and logs everything to MLflow:
# mlflow_demo.py
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from transformers import AutoTokenizer, AutoModel
from datasets import load_dataset
import time
# Configure MLflow
mlflow.set_tracking_uri("file:///home/ubuntu/projects/experiment-tracking/mlruns")
mlflow.set_experiment("sentiment-classification")
class SentimentClassifier(nn.Module):
"""Simple sentiment classifier using a pretrained encoder."""
def __init__(self, encoder_name: str, num_labels: int = 2, dropout: float = 0.1):
super().__init__()
self.encoder = AutoModel.from_pretrained(encoder_name)
self.dropout = nn.Dropout(dropout)
self.classifier = nn.Linear(self.encoder.config.hidden_size, num_labels)
def forward(self, input_ids, attention_mask):
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
pooled = outputs.last_hidden_state[:, 0] # CLS token
pooled = self.dropout(pooled)
return self.classifier(pooled)
def train_epoch(model, dataloader, optimizer, criterion, device):
model.train()
total_loss = 0
correct = 0
total = 0
for batch in dataloader:
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
labels = batch["label"].to(device)
optimizer.zero_grad()
outputs = model(input_ids, attention_mask)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
_, predicted = outputs.max(1)
correct += (predicted == labels).sum().item()
total += labels.size(0)
return total_loss / len(dataloader), correct / total
def evaluate(model, dataloader, criterion, device):
model.eval()
total_loss = 0
correct = 0
total = 0
with torch.no_grad():
for batch in dataloader:
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
labels = batch["label"].to(device)
outputs = model(input_ids, attention_mask)
loss = criterion(outputs, labels)
total_loss += loss.item()
_, predicted = outputs.max(1)
correct += (predicted == labels).sum().item()
total += labels.size(0)
return total_loss / len(dataloader), correct / total
def run_experiment(
encoder_name: str = "distilbert-base-uncased",
learning_rate: float = 2e-5,
batch_size: int = 32,
epochs: int = 3,
dropout: float = 0.1,
max_samples: int = 5000
):
"""Run a single experiment with MLflow tracking."""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
with mlflow.start_run():
# Log ALL parameters upfront
mlflow.log_params({
"encoder_name": encoder_name,
"learning_rate": learning_rate,
"batch_size": batch_size,
"epochs": epochs,
"dropout": dropout,
"max_samples": max_samples,
"device": str(device)
})
# Load and prepare data
dataset = load_dataset("imdb", split="train").shuffle(seed=42).select(range(max_samples))
test_dataset = load_dataset("imdb", split="test").shuffle(seed=42).select(range(max_samples // 5))
train_size = int(0.9 * len(dataset))
train_dataset = dataset.select(range(train_size))
val_dataset = dataset.select(range(train_size, len(dataset)))
mlflow.log_params({
"train_samples": len(train_dataset),
"val_samples": len(val_dataset),
"test_samples": len(test_dataset)
})
# Initialize model
tokenizer = AutoTokenizer.from_pretrained(encoder_name)
model = SentimentClassifier(encoder_name, dropout=dropout).to(device)
# Log model info
total_params = sum(p.numel() for p in model.parameters())
mlflow.log_param("total_parameters", total_params)
# Setup training
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()
# Collate function for DataLoader
def collate_fn(batch):
texts = [item["text"] for item in batch]
labels = torch.tensor([item["label"] for item in batch])
encoded = tokenizer(texts, padding=True, truncation=True,
max_length=128, return_tensors="pt")
return {"input_ids": encoded["input_ids"],
"attention_mask": encoded["attention_mask"],
"label": labels}
train_loader = DataLoader(train_dataset, batch_size=batch_size,
shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size,
collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size,
collate_fn=collate_fn)
# Training loop with per-epoch logging
best_val_acc = 0
start_time = time.time()
for epoch in range(epochs):
train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
val_loss, val_acc = evaluate(model, val_loader, criterion, device)
# Log metrics at each epoch
mlflow.log_metrics({
"train_loss": train_loss,
"train_accuracy": train_acc,
"val_loss": val_loss,
"val_accuracy": val_acc,
}, step=epoch)
print(f"Epoch {epoch+1}/{epochs}: val_acc={val_acc:.4f}")
if val_acc > best_val_acc:
best_val_acc = val_acc
# Final evaluation
test_loss, test_acc = evaluate(model, test_loader, criterion, device)
total_time = time.time() - start_time
mlflow.log_metrics({
"test_loss": test_loss,
"test_accuracy": test_acc,
"best_val_accuracy": best_val_acc,
"total_training_time_seconds": total_time
})
# Log GPU memory if available
if torch.cuda.is_available():
gpu_memory_gb = torch.cuda.max_memory_allocated() / (1024**3)
mlflow.log_metric("gpu_memory_gb", gpu_memory_gb)
# Log the model artifact
mlflow.pytorch.log_model(model, "model")
return test_acc
Real Results
I ran this on an NVIDIA L40S GPU with 5,000 IMDB samples:
============================================================ MLflow Experiment Tracking Demo ============================================================ Using device: cuda Loading dataset... Loading model: distilbert-base-uncased Total parameters: 66,364,418 Epoch 1/3: train_loss=0.4645, train_acc=0.7687, val_loss=0.3345, val_acc=0.8480, time=7.7s Epoch 2/3: train_loss=0.2793, train_acc=0.8876, val_loss=0.3218, val_acc=0.8540, time=7.4s Epoch 3/3: train_loss=0.1626, train_acc=0.9416, val_loss=0.3917, val_acc=0.8540, time=7.4s Test accuracy: 0.8440 Total training time: 23.1s Peak GPU memory: 2.03 GB
Hyperparameter Sweeps
MLflow makes it easy to compare experiments. Here’s a sweep over learning rates and batch sizes:
# mlflow_sweep.py
experiments = [
{"learning_rate": 1e-5, "batch_size": 16},
{"learning_rate": 5e-5, "batch_size": 32},
{"learning_rate": 2e-5, "batch_size": 64},
]
for config in experiments:
run_experiment(**config, epochs=3, max_samples=3000)
Sweep Results (L40S GPU):
| Run ID | Learning Rate | Batch Size | Test Accuracy | Time (s) | GPU (GB) |
|---|---|---|---|---|---|
| 17734e0c | 2e-05 | 32 | 0.8440 | 23.1 | 2.03 |
| 583e230a | 1e-05 | 16 | 0.8233 | 17.9 | 1.39 |
| 57815b97 | 2e-05 | 64 | 0.8133 | 11.9 | 3.25 |
| 9d1444d0 | 5e-05 | 32 | 0.7883 | 13.9 | 2.02 |
Querying Runs Programmatically
import mlflow
mlflow.set_tracking_uri("file:///path/to/mlruns")
# Get experiment
experiment = mlflow.get_experiment_by_name("sentiment-classification")
# Search runs with filters
runs = mlflow.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string="metrics.test_accuracy > 0.8",
order_by=["metrics.test_accuracy DESC"]
)
# Display comparison
print(f"{'Run ID':<12} {'LR':<10} {'Batch':<6} {'Test Acc':<10}")
print("-" * 40)
for _, row in runs.iterrows():
print(f"{row['run_id'][:8]:<12} "
f"{row['params.learning_rate']:<10} "
f"{row['params.batch_size']:<6} "
f"{row['metrics.test_accuracy']:.4f}")
Model Registry
MLflow’s model registry manages model versions and deployment stages:
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Register a model from a run
model_uri = f"runs:/{run_id}/model"
model_version = mlflow.register_model(model_uri, "sentiment-classifier")
# Transition to staging
client.transition_model_version_stage(
name="sentiment-classifier",
version=model_version.version,
stage="Staging"
)
# Load model for inference
model = mlflow.pytorch.load_model("models:/sentiment-classifier/Staging")
Part 2: Langfuse for LLM Observability
While MLflow tracks traditional ML experiments, LLMs need different observability:
| Traditional ML | LLM Applications |
|---|---|
| Training metrics | Prompt/completion pairs |
| Model weights | Token usage |
| Hyperparameters | Latency per call |
| Epochs | Cost tracking |
| Batch accuracy | Quality scores |
Langfuse is purpose-built for LLM observability, tracking prompts, completions, tokens, latency, and cost.
Installation
pip install langfuse
Tracing Architecture
Langfuse uses three core concepts:
- Traces - Top-level container for a user interaction
- Spans - Sub-operations within a trace (retrieval, preprocessing)
- Generations - LLM calls with input/output/tokens
Trace: "answer_question"
├── Span: "retrieve_context"
│ └── (vector search, 45ms)
├── Generation: "generate_answer"
│ └── (Claude Haiku, 523ms, 134 tokens)
└── Span: "format_response"
└── (post-processing, 12ms)
Local Tracing Pattern
For development without a Langfuse server, you can implement local tracing:
# local_tracing.py
import json
import time
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any, List
@dataclass
class LLMTrace:
"""Structure for tracking LLM calls."""
trace_id: str
name: str
input: str
output: Optional[str] = None
model: str = ""
latency_ms: float = 0
input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0
cost_usd: float = 0
metadata: Dict[str, Any] = None
status: str = "success"
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class TraceStore:
"""Local storage for LLM traces."""
def __init__(self, output_file: str = "traces.jsonl"):
self.traces: List[LLMTrace] = []
self.output_file = output_file
def log(self, trace: LLMTrace):
self.traces.append(trace)
with open(self.output_file, "a") as f:
f.write(json.dumps(asdict(trace)) + "\n")
def summary(self) -> Dict[str, Any]:
if not self.traces:
return {}
return {
"total_traces": len(self.traces),
"avg_latency_ms": sum(t.latency_ms for t in self.traces) / len(self.traces),
"total_tokens": sum(t.total_tokens for t in self.traces),
"total_cost_usd": sum(t.cost_usd for t in self.traces),
}
# Token pricing (per 1M tokens) - January 2026
PRICING = {
"us.anthropic.claude-haiku-4-5-20251001-v1:0": {"input": 0.80, "output": 4.00},
"anthropic.claude-3-5-sonnet-20241022-v2:0": {"input": 3.00, "output": 15.00},
}
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate cost in USD."""
pricing = PRICING.get(model, {"input": 1.0, "output": 3.0})
return (input_tokens / 1e6) * pricing["input"] + (output_tokens / 1e6) * pricing["output"]
Traced Bedrock Client
Here’s a Bedrock client that automatically logs all LLM calls:
# traced_bedrock.py
import boto3
import json
import time
class TracedBedrockClient:
"""Bedrock client with automatic tracing."""
def __init__(self, trace_store: TraceStore, region: str = "us-east-1"):
self.client = boto3.client("bedrock-runtime", region_name=region)
self.trace_store = trace_store
self.trace_counter = 0
def invoke(
self,
model_id: str,
prompt: str,
max_tokens: int = 1024,
temperature: float = 0.7,
trace_name: str = "llm_call",
metadata: Dict[str, Any] = None
) -> str:
"""Invoke model with automatic tracing."""
self.trace_counter += 1
trace_id = f"trace_{self.trace_counter:04d}"
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"temperature": temperature,
"messages": [{"role": "user", "content": prompt}]
}
start = time.time()
try:
response = self.client.invoke_model(
modelId=model_id,
body=json.dumps(body)
)
result = json.loads(response["body"].read())
latency = (time.time() - start) * 1000
output = result["content"][0]["text"]
input_tokens = result["usage"]["input_tokens"]
output_tokens = result["usage"]["output_tokens"]
trace = LLMTrace(
trace_id=trace_id,
name=trace_name,
input=prompt[:500],
output=output[:500],
model=model_id,
latency_ms=latency,
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=input_tokens + output_tokens,
cost_usd=calculate_cost(model_id, input_tokens, output_tokens),
metadata=metadata or {},
status="success"
)
self.trace_store.log(trace)
return output
except Exception as e:
latency = (time.time() - start) * 1000
trace = LLMTrace(
trace_id=trace_id,
name=trace_name,
input=prompt[:500],
model=model_id,
latency_ms=latency,
metadata=metadata or {},
status=f"error: {str(e)}"
)
self.trace_store.log(trace)
raise
Demo Results
Running three traced LLM calls:
============================================================ LLM Observability Demo (Local Tracing) ============================================================ Logging demo traces... Logged trace: trace_0001 (523ms, 134 tokens) Logged trace: trace_0002 (1247ms, 1570 tokens) Logged trace: trace_0003 (834ms, 323 tokens) ============================================================ TRACE SUMMARY ============================================================ Total traces: 3 Avg latency: 868 ms Total tokens: 2,027 Total cost: $0.003714 Models used: us.anthropic.claude-haiku-4-5-20251001-v1:0 ============================================================ TRACE DETAILS ============================================================ Name Latency Tokens Cost ------------------------------------------------------------ sentiment_analysis 523 ms 134 $0.000392 summarization 1247 ms 1570 $0.002280 code_generation 834 ms 323 $0.001042
Langfuse SDK Integration
For production, use the full Langfuse SDK with their cloud or self-hosted server:
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
import os
# Initialize client
langfuse = Langfuse(
public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
secret_key=os.environ["LANGFUSE_SECRET_KEY"],
host="https://cloud.langfuse.com" # or self-hosted URL
)
# Decorator-based tracing - automatically logs function calls
@observe()
def analyze_sentiment(text: str) -> str:
"""This function is automatically traced."""
response = bedrock_client.invoke(
model_id="us.anthropic.claude-haiku-4-5-20251001-v1:0",
prompt=f"Analyze sentiment: {text}"
)
return response
# Manual tracing with generations
@observe()
def process_document(doc: str):
# Update current span
langfuse_context.update_current_observation(
name="preprocess",
metadata={"doc_length": len(doc)}
)
# Create a generation span for the LLM call
with langfuse_context.observe_generation(
name="summarize",
model="claude-haiku-4.5",
input=doc
) as generation:
response = call_llm(doc)
generation.output = response
generation.usage = {"input": 100, "output": 50}
return response
# Important: flush before exit
langfuse.flush()
Self-Hosted Langfuse
For data privacy, run Langfuse locally with Docker:
# docker-compose.yml
version: '3.8'
services:
langfuse:
image: langfuse/langfuse:latest
ports:
- "3000:3000"
environment:
- DATABASE_URL=postgresql://postgres:postgres@db:5432/langfuse
- NEXTAUTH_URL=http://localhost:3000
- NEXTAUTH_SECRET=your-secret-key
- SALT=your-salt
depends_on:
- db
db:
image: postgres:15
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=langfuse
volumes:
- langfuse_data:/var/lib/postgresql/data
volumes:
langfuse_data:
docker-compose up -d
# Access at http://localhost:3000
Part 3: Combining MLflow and Langfuse
For hybrid ML/LLM systems, use both:
import mlflow
from langfuse import Langfuse
# MLflow for the training experiment
mlflow.set_experiment("rag-qa-system")
with mlflow.start_run():
# Log embedding model training
mlflow.log_params({"embedding_model": "bge-base", "epochs": 10})
train_embeddings()
mlflow.log_metric("recall@10", 0.85)
# Log retrieval evaluation
mlflow.log_metric("retrieval_precision", 0.72)
# Langfuse for LLM inference tracing
langfuse = Langfuse(...)
@observe()
def answer_question(question: str) -> str:
# Retrieval (tracked as span)
docs = retrieve(question)
# Generation (tracked with tokens/cost)
with langfuse_context.observe_generation(...) as gen:
answer = llm(question, docs)
gen.output = answer
return answer
Best Practices
1. Log Everything Upfront
# Good: Log all params at start
with mlflow.start_run():
mlflow.log_params({
"model": model_name,
"learning_rate": lr,
"batch_size": bs,
"seed": 42,
"data_version": "v1.2.0",
"git_commit": get_git_hash()
})
2. Use Consistent Naming
# Experiments: project/task
mlflow.set_experiment("fraud-detection/baseline")
mlflow.set_experiment("fraud-detection/with-features")
# Traces: action_object
trace_name = "classify_transaction"
trace_name = "summarize_document"
3. Track Costs Proactively
# LLM cost tracking
def log_llm_cost(model: str, input_tokens: int, output_tokens: int):
cost = calculate_cost(model, input_tokens, output_tokens)
mlflow.log_metric("llm_cost_usd", cost)
# Alert if single call exceeds threshold
if cost > 0.10:
print(f"WARNING: High-cost LLM call: ${cost:.4f}")
4. Set Up Alerts
# Daily cost summary
def daily_summary(trace_store: TraceStore):
summary = trace_store.summary()
if summary["total_cost_usd"] > 10.0:
send_alert(f"Daily LLM cost exceeded $10: ${summary['total_cost_usd']:.2f}")
if summary["avg_latency_ms"] > 2000:
send_alert(f"High latency detected: {summary['avg_latency_ms']:.0f}ms avg")
MLflow vs Langfuse: When to Use Each
| Scenario | Tool | Reason |
|---|---|---|
| Training neural networks | MLflow | Epoch metrics, model artifacts |
| Hyperparameter tuning | MLflow | Run comparison, parameter logging |
| LLM prompt iteration | Langfuse | Prompt versioning, output quality |
| RAG pipeline | Both | MLflow for retrieval, Langfuse for generation |
| Production LLM | Langfuse | Real-time traces, cost monitoring |
| Model registry | MLflow | Version management, staging |
Full Code
All code from this tutorial is available at:
Key Takeaways
- Track everything - Parameters, metrics, artifacts, code version
- MLflow for ML - Training experiments, hyperparameter sweeps, model registry
- Langfuse for LLMs - Prompts, completions, tokens, latency, cost
- Use both together - Hybrid systems need both perspectives
- Cost visibility - LLM costs add up fast; track them from day one
- Self-host for privacy - Both tools support self-hosted deployment
What’s Next
This tutorial is part of the Senior MLE Guide series:
- GPU Sizing for ML Workloads
- Experiment Tracking with MLflow & Langfuse ← You are here
- CI/CD for Machine Learning (coming soon)
- Model Serving on AWS (coming soon)
- ML Monitoring & Drift Detection (coming soon)
- Security for ML Systems (coming soon)
Comments
to join the discussion.