Retraining Strategies & MLOps
MLOps is the practice of applying DevOps principles to ML model lifecycle management: versioning, continuous integration, continuous deployment, monitoring, and automated retraining. A mature ML system automatically detects drift, triggers retraining on fresh data, validates the new model against the champion, and promotes it if it's better — all without manual intervention.
Automated Retraining Pipeline Pattern
import numpy as np
import pandas as pd
import joblib
import json
import datetime
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, PowerTransformer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import roc_auc_score
from scipy.stats import ks_2samp
np.random.seed(42)
class MLOpsPipeline:
"""
Automated ML lifecycle manager: detect drift, retrain, validate, promote.
"""
def __init__(self, model_path: str, meta_path: str, feature_cols: list, threshold: dict):
self.model_path = model_path
self.meta_path = meta_path
self.feature_cols = feature_cols
self.threshold = threshold # drift thresholds
self.champion = joblib.load(model_path)
with open(meta_path) as f:
self.meta = json.load(f)
def _check_drift(self, reference: pd.DataFrame, production: pd.DataFrame) -> bool:
"""Returns True if drift detected."""
for col in self.feature_cols:
_, pval = ks_2samp(reference[col].values, production[col].values)
if pval < self.threshold.get("ks_pval", 0.01):
print(f" DRIFT: {col} (KS p={pval:.4f})")
return True
return False
def _train_challenger(self, X_new: pd.DataFrame, y_new: pd.Series) -> Pipeline:
"""Train a new model on fresh data."""
pipe = Pipeline([("sc", StandardScaler()), ("m", GradientBoostingClassifier(n_estimators=200, learning_rate=0.05, random_state=42))])
pipe.fit(X_new, y_new)
return pipe
def _evaluate(self, model: Pipeline, X_val: pd.DataFrame, y_val: pd.Series) -> float:
return roc_auc_score(y_val, model.predict_proba(X_val)[:, 1])
def run_retraining_cycle(self, ref_data: pd.DataFrame, prod_data: pd.DataFrame,
new_train: pd.DataFrame, new_target: pd.Series) -> str:
"""Full retraining cycle."""
print(f"\nRetraining cycle at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M')}")
# STEP 1: Check for drift
drift_detected = self._check_drift(ref_data, prod_data)
if not drift_detected:
print(" No significant drift detected. Skipping retraining.")
return "NO_CHANGE"
# STEP 2: Train challenger on new data
print(" Training challenger model on fresh data...")
X_tr, X_val, y_tr, y_val = train_test_split(new_train, new_target, test_size=0.2, random_state=42)
challenger = self._train_challenger(X_tr, y_tr)
champ_auc = self._evaluate(self.champion, X_val[self.feature_cols], y_val)
chal_auc = self._evaluate(challenger, X_val[self.feature_cols], y_val)
print(f" Champion AUC: {champ_auc:.4f}")
print(f" Challenger AUC: {chal_auc:.4f}")
# STEP 3: Promote if better
if chal_auc > champ_auc + self.threshold.get("min_lift", 0.005):
self.champion = challenger
new_meta = {
**self.meta,
"version": self._increment_version(self.meta["version"]),
"retrained": datetime.datetime.now().isoformat(),
"metrics": {"champion_auc": champ_auc, "challenger_auc": chal_auc},
}
self.meta = new_meta
joblib.dump(challenger, self.model_path)
with open(self.meta_path, "w") as f:
json.dump(new_meta, f, indent=2)
print(f" PROMOTED: challenger v{new_meta['version']} (lift=+{chal_auc-champ_auc:.4f})")
return "PROMOTED"
else:
print(f" KEPT CHAMPION: challenger lift {chal_auc-champ_auc:+.4f} below threshold")
return "KEPT_CHAMPION"
def _increment_version(self, version: str) -> str:
parts = version.split(".")
parts[-1] = str(int(parts[-1]) + 1)
return ".".join(parts)
# SIMULATE MLOPS CYCLE
FEATURES = ["age", "income", "credit"]
def make_dataset(n: int, drift: float = 0.0) -> tuple:
X = pd.DataFrame({
"age": np.random.normal(38 + drift * 5, 12, n).clip(18, 75),
"income": np.random.exponential(55000 - drift * 5000, n).clip(15000, 200000),
"credit": np.random.normal(680 - drift * 30, 80, n).clip(300, 850),
})
y = pd.Series(np.random.choice([0, 1], n, p=[0.83, 0.17]))
return X, y
# Build initial model
X_init, y_init = make_dataset(2000)
init_pipe = Pipeline([("sc", StandardScaler()), ("m", GradientBoostingClassifier(n_estimators=100, random_state=42))])
init_pipe.fit(X_init, y_init)
joblib.dump(init_pipe, "prod_model.joblib")
init_meta = {"version": "1.0.0", "created_at": datetime.datetime.now().isoformat()}
with open("prod_meta.json", "w") as f: json.dump(init_meta, f)
# Create MLOps pipeline
mlops = MLOpsPipeline("prod_model.joblib", "prod_meta.json", FEATURES, {"ks_pval": 0.05, "min_lift": 0.002})
# Simulate monthly cycles
for month, drift_level in enumerate([0.0, 0.2, 0.5, 0.8, 1.0], 1):
ref_data, _ = make_dataset(500, drift=0.0) # stable reference
prod_data, _ = make_dataset(300, drift=drift_level) # drifted production
new_train, new_y = make_dataset(2000, drift=drift_level) # new training data
result = mlops.run_retraining_cycle(ref_data, prod_data, new_train, new_y)
print(f" Month {month} (drift={drift_level}): {result}")Tip
Tip
Practice Retraining Strategies MLOps in small, isolated examples before integrating into larger projects. Breaking concepts into small experiments builds genuine understanding faster than reading alone.
Neural networks learn by adjusting connection weights via backpropagation
Practice Task
Note
Practice Task — (1) Write a working example of Retraining Strategies MLOps from scratch without looking at notes. (2) Modify it to handle an edge case (empty input, null value, or error state). (3) Share your solution in the Priygop community for feedback.
Quick Quiz
Common Mistake
Warning
A common mistake with Retraining Strategies MLOps is skipping edge case testing — empty inputs, null values, and unexpected data types. Always validate boundary conditions to write robust, production-ready ml code.