Experiment Tracking with MLflow and Langfuse

Open Seas 30 min read January 24, 2026 |
0

Set up experiment tracking for ML models with MLflow and LLM observability with Langfuse. Includes hyperparameter sweeps, model registry, and cost tracking.

Every ML engineer has been there: you run an experiment, get great results, then can’t remember what hyperparameters you used. Or worse—you can’t reproduce the results because you didn’t track the code version, data version, or random seed.

Experiment tracking solves this by automatically logging everything: parameters, metrics, artifacts, and code. This tutorial covers two essential tools:

  • MLflow - The standard for tracking traditional ML experiments
  • Langfuse - Purpose-built observability for LLM applications

By the end, you’ll have a complete tracking setup that works for both classical ML and LLM-powered systems.

Why Track Experiments?

Without tracking, you’re flying blind:

ProblemImpact
Can’t reproduce resultsWasted time re-running experiments
Lost hyperparametersCan’t explain why one model is better
No comparisonHard to know if changes helped
Missing artifactsModels get lost or overwritten
Unknown costsLLM bills surprise you at month end

Proper tracking gives you:

  • Reproducibility - Every experiment can be recreated
  • Comparison - Side-by-side metric comparison
  • Lineage - Know exactly what produced each model
  • Collaboration - Share results with your team
  • Cost visibility - Track token usage and API costs

Part 1: MLflow for ML Experiments

MLflow is the industry standard for experiment tracking. It’s open source, works with any ML framework, and integrates with major cloud platforms.

Installation and Setup

pip install mlflow torch transformers datasets scikit-learn

MLflow supports multiple backend stores:

BackendUse CaseSetup
File systemLocal developmentfile:///path/to/mlruns
SQLiteSingle-user, persistentsqlite:///mlflow.db
PostgreSQLTeam, productionpostgresql://user:pass@host/db
MLflow CloudManaged serviceDatabricks integration

For this tutorial, we’ll start with file-based tracking, then show database setup.

Basic Experiment Tracking

Here’s a complete example that trains a sentiment classifier and logs everything to MLflow:

# mlflow_demo.py
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from transformers import AutoTokenizer, AutoModel
from datasets import load_dataset
import time

# Configure MLflow
mlflow.set_tracking_uri("file:///home/ubuntu/projects/experiment-tracking/mlruns")
mlflow.set_experiment("sentiment-classification")

class SentimentClassifier(nn.Module):
    """Simple sentiment classifier using a pretrained encoder."""

    def __init__(self, encoder_name: str, num_labels: int = 2, dropout: float = 0.1):
        super().__init__()
        self.encoder = AutoModel.from_pretrained(encoder_name)
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(self.encoder.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask):
        outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
        pooled = outputs.last_hidden_state[:, 0]  # CLS token
        pooled = self.dropout(pooled)
        return self.classifier(pooled)

def train_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for batch in dataloader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["label"].to(device)

        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = outputs.max(1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    return total_loss / len(dataloader), correct / total

def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)

            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            _, predicted = outputs.max(1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    return total_loss / len(dataloader), correct / total

def run_experiment(
    encoder_name: str = "distilbert-base-uncased",
    learning_rate: float = 2e-5,
    batch_size: int = 32,
    epochs: int = 3,
    dropout: float = 0.1,
    max_samples: int = 5000
):
    """Run a single experiment with MLflow tracking."""

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    with mlflow.start_run():
        # Log ALL parameters upfront
        mlflow.log_params({
            "encoder_name": encoder_name,
            "learning_rate": learning_rate,
            "batch_size": batch_size,
            "epochs": epochs,
            "dropout": dropout,
            "max_samples": max_samples,
            "device": str(device)
        })

        # Load and prepare data
        dataset = load_dataset("imdb", split="train").shuffle(seed=42).select(range(max_samples))
        test_dataset = load_dataset("imdb", split="test").shuffle(seed=42).select(range(max_samples // 5))

        train_size = int(0.9 * len(dataset))
        train_dataset = dataset.select(range(train_size))
        val_dataset = dataset.select(range(train_size, len(dataset)))

        mlflow.log_params({
            "train_samples": len(train_dataset),
            "val_samples": len(val_dataset),
            "test_samples": len(test_dataset)
        })

        # Initialize model
        tokenizer = AutoTokenizer.from_pretrained(encoder_name)
        model = SentimentClassifier(encoder_name, dropout=dropout).to(device)

        # Log model info
        total_params = sum(p.numel() for p in model.parameters())
        mlflow.log_param("total_parameters", total_params)

        # Setup training
        optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
        criterion = nn.CrossEntropyLoss()

        # Collate function for DataLoader
        def collate_fn(batch):
            texts = [item["text"] for item in batch]
            labels = torch.tensor([item["label"] for item in batch])
            encoded = tokenizer(texts, padding=True, truncation=True,
                              max_length=128, return_tensors="pt")
            return {"input_ids": encoded["input_ids"],
                   "attention_mask": encoded["attention_mask"],
                   "label": labels}

        train_loader = DataLoader(train_dataset, batch_size=batch_size,
                                 shuffle=True, collate_fn=collate_fn)
        val_loader = DataLoader(val_dataset, batch_size=batch_size,
                               collate_fn=collate_fn)
        test_loader = DataLoader(test_dataset, batch_size=batch_size,
                                collate_fn=collate_fn)

        # Training loop with per-epoch logging
        best_val_acc = 0
        start_time = time.time()

        for epoch in range(epochs):
            train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
            val_loss, val_acc = evaluate(model, val_loader, criterion, device)

            # Log metrics at each epoch
            mlflow.log_metrics({
                "train_loss": train_loss,
                "train_accuracy": train_acc,
                "val_loss": val_loss,
                "val_accuracy": val_acc,
            }, step=epoch)

            print(f"Epoch {epoch+1}/{epochs}: val_acc={val_acc:.4f}")

            if val_acc > best_val_acc:
                best_val_acc = val_acc

        # Final evaluation
        test_loss, test_acc = evaluate(model, test_loader, criterion, device)
        total_time = time.time() - start_time

        mlflow.log_metrics({
            "test_loss": test_loss,
            "test_accuracy": test_acc,
            "best_val_accuracy": best_val_acc,
            "total_training_time_seconds": total_time
        })

        # Log GPU memory if available
        if torch.cuda.is_available():
            gpu_memory_gb = torch.cuda.max_memory_allocated() / (1024**3)
            mlflow.log_metric("gpu_memory_gb", gpu_memory_gb)

        # Log the model artifact
        mlflow.pytorch.log_model(model, "model")

        return test_acc

Real Results

I ran this on an NVIDIA L40S GPU with 5,000 IMDB samples:

Output
============================================================
MLflow Experiment Tracking Demo
============================================================
Using device: cuda
Loading dataset...
Loading model: distilbert-base-uncased
Total parameters: 66,364,418
Epoch 1/3: train_loss=0.4645, train_acc=0.7687, val_loss=0.3345, val_acc=0.8480, time=7.7s
Epoch 2/3: train_loss=0.2793, train_acc=0.8876, val_loss=0.3218, val_acc=0.8540, time=7.4s
Epoch 3/3: train_loss=0.1626, train_acc=0.9416, val_loss=0.3917, val_acc=0.8540, time=7.4s

Test accuracy: 0.8440
Total training time: 23.1s
Peak GPU memory: 2.03 GB

Hyperparameter Sweeps

MLflow makes it easy to compare experiments. Here’s a sweep over learning rates and batch sizes:

# mlflow_sweep.py
experiments = [
    {"learning_rate": 1e-5, "batch_size": 16},
    {"learning_rate": 5e-5, "batch_size": 32},
    {"learning_rate": 2e-5, "batch_size": 64},
]

for config in experiments:
    run_experiment(**config, epochs=3, max_samples=3000)

Sweep Results (L40S GPU):

Run IDLearning RateBatch SizeTest AccuracyTime (s)GPU (GB)
17734e0c2e-05320.844023.12.03
583e230a1e-05160.823317.91.39
57815b972e-05640.813311.93.25
9d1444d05e-05320.788313.92.02

Querying Runs Programmatically

import mlflow

mlflow.set_tracking_uri("file:///path/to/mlruns")

# Get experiment
experiment = mlflow.get_experiment_by_name("sentiment-classification")

# Search runs with filters
runs = mlflow.search_runs(
    experiment_ids=[experiment.experiment_id],
    filter_string="metrics.test_accuracy > 0.8",
    order_by=["metrics.test_accuracy DESC"]
)

# Display comparison
print(f"{'Run ID':<12} {'LR':<10} {'Batch':<6} {'Test Acc':<10}")
print("-" * 40)
for _, row in runs.iterrows():
    print(f"{row['run_id'][:8]:<12} "
          f"{row['params.learning_rate']:<10} "
          f"{row['params.batch_size']:<6} "
          f"{row['metrics.test_accuracy']:.4f}")

Model Registry

MLflow’s model registry manages model versions and deployment stages:

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register a model from a run
model_uri = f"runs:/{run_id}/model"
model_version = mlflow.register_model(model_uri, "sentiment-classifier")

# Transition to staging
client.transition_model_version_stage(
    name="sentiment-classifier",
    version=model_version.version,
    stage="Staging"
)

# Load model for inference
model = mlflow.pytorch.load_model("models:/sentiment-classifier/Staging")

Part 2: Langfuse for LLM Observability

While MLflow tracks traditional ML experiments, LLMs need different observability:

Traditional MLLLM Applications
Training metricsPrompt/completion pairs
Model weightsToken usage
HyperparametersLatency per call
EpochsCost tracking
Batch accuracyQuality scores

Langfuse is purpose-built for LLM observability, tracking prompts, completions, tokens, latency, and cost.

Installation

pip install langfuse

Tracing Architecture

Langfuse uses three core concepts:

  1. Traces - Top-level container for a user interaction
  2. Spans - Sub-operations within a trace (retrieval, preprocessing)
  3. Generations - LLM calls with input/output/tokens
Trace: "answer_question"
├── Span: "retrieve_context"
│   └── (vector search, 45ms)
├── Generation: "generate_answer"
│   └── (Claude Haiku, 523ms, 134 tokens)
└── Span: "format_response"
    └── (post-processing, 12ms)

Local Tracing Pattern

For development without a Langfuse server, you can implement local tracing:

# local_tracing.py
import json
import time
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any, List

@dataclass
class LLMTrace:
    """Structure for tracking LLM calls."""
    trace_id: str
    name: str
    input: str
    output: Optional[str] = None
    model: str = ""
    latency_ms: float = 0
    input_tokens: int = 0
    output_tokens: int = 0
    total_tokens: int = 0
    cost_usd: float = 0
    metadata: Dict[str, Any] = None
    status: str = "success"

    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}

class TraceStore:
    """Local storage for LLM traces."""

    def __init__(self, output_file: str = "traces.jsonl"):
        self.traces: List[LLMTrace] = []
        self.output_file = output_file

    def log(self, trace: LLMTrace):
        self.traces.append(trace)
        with open(self.output_file, "a") as f:
            f.write(json.dumps(asdict(trace)) + "\n")

    def summary(self) -> Dict[str, Any]:
        if not self.traces:
            return {}
        return {
            "total_traces": len(self.traces),
            "avg_latency_ms": sum(t.latency_ms for t in self.traces) / len(self.traces),
            "total_tokens": sum(t.total_tokens for t in self.traces),
            "total_cost_usd": sum(t.cost_usd for t in self.traces),
        }

# Token pricing (per 1M tokens) - January 2026
PRICING = {
    "us.anthropic.claude-haiku-4-5-20251001-v1:0": {"input": 0.80, "output": 4.00},
    "anthropic.claude-3-5-sonnet-20241022-v2:0": {"input": 3.00, "output": 15.00},
}

def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    """Calculate cost in USD."""
    pricing = PRICING.get(model, {"input": 1.0, "output": 3.0})
    return (input_tokens / 1e6) * pricing["input"] + (output_tokens / 1e6) * pricing["output"]

Traced Bedrock Client

Here’s a Bedrock client that automatically logs all LLM calls:

# traced_bedrock.py
import boto3
import json
import time

class TracedBedrockClient:
    """Bedrock client with automatic tracing."""

    def __init__(self, trace_store: TraceStore, region: str = "us-east-1"):
        self.client = boto3.client("bedrock-runtime", region_name=region)
        self.trace_store = trace_store
        self.trace_counter = 0

    def invoke(
        self,
        model_id: str,
        prompt: str,
        max_tokens: int = 1024,
        temperature: float = 0.7,
        trace_name: str = "llm_call",
        metadata: Dict[str, Any] = None
    ) -> str:
        """Invoke model with automatic tracing."""

        self.trace_counter += 1
        trace_id = f"trace_{self.trace_counter:04d}"

        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "temperature": temperature,
            "messages": [{"role": "user", "content": prompt}]
        }

        start = time.time()

        try:
            response = self.client.invoke_model(
                modelId=model_id,
                body=json.dumps(body)
            )
            result = json.loads(response["body"].read())
            latency = (time.time() - start) * 1000

            output = result["content"][0]["text"]
            input_tokens = result["usage"]["input_tokens"]
            output_tokens = result["usage"]["output_tokens"]

            trace = LLMTrace(
                trace_id=trace_id,
                name=trace_name,
                input=prompt[:500],
                output=output[:500],
                model=model_id,
                latency_ms=latency,
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                total_tokens=input_tokens + output_tokens,
                cost_usd=calculate_cost(model_id, input_tokens, output_tokens),
                metadata=metadata or {},
                status="success"
            )
            self.trace_store.log(trace)
            return output

        except Exception as e:
            latency = (time.time() - start) * 1000
            trace = LLMTrace(
                trace_id=trace_id,
                name=trace_name,
                input=prompt[:500],
                model=model_id,
                latency_ms=latency,
                metadata=metadata or {},
                status=f"error: {str(e)}"
            )
            self.trace_store.log(trace)
            raise

Demo Results

Running three traced LLM calls:

Output
============================================================
LLM Observability Demo (Local Tracing)
============================================================

Logging demo traces...
Logged trace: trace_0001 (523ms, 134 tokens)
Logged trace: trace_0002 (1247ms, 1570 tokens)
Logged trace: trace_0003 (834ms, 323 tokens)

============================================================
TRACE SUMMARY
============================================================
Total traces:     3
Avg latency:      868 ms
Total tokens:     2,027
Total cost:       $0.003714
Models used:      us.anthropic.claude-haiku-4-5-20251001-v1:0

============================================================
TRACE DETAILS
============================================================
Name                 Latency    Tokens     Cost
------------------------------------------------------------
sentiment_analysis      523 ms     134     $0.000392
summarization          1247 ms    1570     $0.002280
code_generation         834 ms     323     $0.001042

Langfuse SDK Integration

For production, use the full Langfuse SDK with their cloud or self-hosted server:

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
import os

# Initialize client
langfuse = Langfuse(
    public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
    secret_key=os.environ["LANGFUSE_SECRET_KEY"],
    host="https://cloud.langfuse.com"  # or self-hosted URL
)

# Decorator-based tracing - automatically logs function calls
@observe()
def analyze_sentiment(text: str) -> str:
    """This function is automatically traced."""
    response = bedrock_client.invoke(
        model_id="us.anthropic.claude-haiku-4-5-20251001-v1:0",
        prompt=f"Analyze sentiment: {text}"
    )
    return response

# Manual tracing with generations
@observe()
def process_document(doc: str):
    # Update current span
    langfuse_context.update_current_observation(
        name="preprocess",
        metadata={"doc_length": len(doc)}
    )

    # Create a generation span for the LLM call
    with langfuse_context.observe_generation(
        name="summarize",
        model="claude-haiku-4.5",
        input=doc
    ) as generation:
        response = call_llm(doc)
        generation.output = response
        generation.usage = {"input": 100, "output": 50}

    return response

# Important: flush before exit
langfuse.flush()

Self-Hosted Langfuse

For data privacy, run Langfuse locally with Docker:

# docker-compose.yml
version: '3.8'
services:
  langfuse:
    image: langfuse/langfuse:latest
    ports:
      - "3000:3000"
    environment:
      - DATABASE_URL=postgresql://postgres:postgres@db:5432/langfuse
      - NEXTAUTH_URL=http://localhost:3000
      - NEXTAUTH_SECRET=your-secret-key
      - SALT=your-salt
    depends_on:
      - db

  db:
    image: postgres:15
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=langfuse
    volumes:
      - langfuse_data:/var/lib/postgresql/data

volumes:
  langfuse_data:
docker-compose up -d
# Access at http://localhost:3000

Part 3: Combining MLflow and Langfuse

For hybrid ML/LLM systems, use both:

import mlflow
from langfuse import Langfuse

# MLflow for the training experiment
mlflow.set_experiment("rag-qa-system")

with mlflow.start_run():
    # Log embedding model training
    mlflow.log_params({"embedding_model": "bge-base", "epochs": 10})
    train_embeddings()
    mlflow.log_metric("recall@10", 0.85)

    # Log retrieval evaluation
    mlflow.log_metric("retrieval_precision", 0.72)

# Langfuse for LLM inference tracing
langfuse = Langfuse(...)

@observe()
def answer_question(question: str) -> str:
    # Retrieval (tracked as span)
    docs = retrieve(question)

    # Generation (tracked with tokens/cost)
    with langfuse_context.observe_generation(...) as gen:
        answer = llm(question, docs)
        gen.output = answer

    return answer

Best Practices

1. Log Everything Upfront

# Good: Log all params at start
with mlflow.start_run():
    mlflow.log_params({
        "model": model_name,
        "learning_rate": lr,
        "batch_size": bs,
        "seed": 42,
        "data_version": "v1.2.0",
        "git_commit": get_git_hash()
    })

2. Use Consistent Naming

# Experiments: project/task
mlflow.set_experiment("fraud-detection/baseline")
mlflow.set_experiment("fraud-detection/with-features")

# Traces: action_object
trace_name = "classify_transaction"
trace_name = "summarize_document"

3. Track Costs Proactively

# LLM cost tracking
def log_llm_cost(model: str, input_tokens: int, output_tokens: int):
    cost = calculate_cost(model, input_tokens, output_tokens)
    mlflow.log_metric("llm_cost_usd", cost)

    # Alert if single call exceeds threshold
    if cost > 0.10:
        print(f"WARNING: High-cost LLM call: ${cost:.4f}")

4. Set Up Alerts

# Daily cost summary
def daily_summary(trace_store: TraceStore):
    summary = trace_store.summary()

    if summary["total_cost_usd"] > 10.0:
        send_alert(f"Daily LLM cost exceeded $10: ${summary['total_cost_usd']:.2f}")

    if summary["avg_latency_ms"] > 2000:
        send_alert(f"High latency detected: {summary['avg_latency_ms']:.0f}ms avg")

MLflow vs Langfuse: When to Use Each

ScenarioToolReason
Training neural networksMLflowEpoch metrics, model artifacts
Hyperparameter tuningMLflowRun comparison, parameter logging
LLM prompt iterationLangfusePrompt versioning, output quality
RAG pipelineBothMLflow for retrieval, Langfuse for generation
Production LLMLangfuseReal-time traces, cost monitoring
Model registryMLflowVersion management, staging

Full Code

All code from this tutorial is available at:

Key Takeaways

  1. Track everything - Parameters, metrics, artifacts, code version
  2. MLflow for ML - Training experiments, hyperparameter sweeps, model registry
  3. Langfuse for LLMs - Prompts, completions, tokens, latency, cost
  4. Use both together - Hybrid systems need both perspectives
  5. Cost visibility - LLM costs add up fast; track them from day one
  6. Self-host for privacy - Both tools support self-hosted deployment

What’s Next

This tutorial is part of the Senior MLE Guide series:

  1. GPU Sizing for ML Workloads
  2. Experiment Tracking with MLflow & Langfuse ← You are here
  3. CI/CD for Machine Learning (coming soon)
  4. Model Serving on AWS (coming soon)
  5. ML Monitoring & Drift Detection (coming soon)
  6. Security for ML Systems (coming soon)
Found this helpful?
0

Comments

Loading comments...