Advanced Pipeline Architecture
A production ML pipeline is more than just chaining a scaler and a model. It includes: feature engineering steps, multiple preprocessing branches via ColumnTransformer, optional feature selection, and the final estimator — all composable and serializable as a single object. Pipelines prevent data leakage, make cross-validation correct, and simplify deployment to a single .predict() call.
Multi-Branch Pipeline for Mixed Data
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, RobustScaler, PowerTransformer
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.feature_selection import SelectFromModel
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
import joblib
np.random.seed(42)
N = 2000
df = pd.DataFrame({
"age": np.random.normal(38, 12, N).clip(18, 75),
"income": np.random.exponential(55000, N).clip(15000, 300000),
"credit": np.random.normal(680, 80, N).clip(300, 850),
"loan_amt": np.random.exponential(18000, N).clip(1000, 80000),
"education": np.random.choice(["high_school","bachelor","master","phd"], N, p=[0.3,0.4,0.2,0.1]),
"employment": np.random.choice(["full-time","part-time","self-employed"], N, p=[0.6,0.2,0.2]),
"region": np.random.choice(["north","south","east","west"], N),
"default": np.random.choice([0,1], N, p=[0.82,0.18]),
})
# Inject missing values
for col, frac in [("income",0.08),("credit",0.05),("education",0.03)]:
df.loc[df.sample(int(N*frac), random_state=1).index, col] = np.nan
X = df.drop("default", axis=1)
y = df["default"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
# DEFINE COLUMN GROUPS BY TYPE AND TREATMENT
numeric_standard = ["age", "credit"] # normally distributed -> StandardScaler
numeric_skewed = ["income", "loan_amt"] # right-skewed -> PowerTransformer
categorical_low = ["education", "employment"] # low cardinality -> OHE
categorical_nominal= ["region"] # unordered -> OHE, no ordinal
# INDIVIDUAL BRANCH PIPELINES
num_std_pipe = Pipeline([
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
])
num_skew_pipe = Pipeline([
("imputer", SimpleImputer(strategy="median")),
("power", PowerTransformer(method="yeo-johnson")),
])
cat_pipe = Pipeline([
("imputer", SimpleImputer(strategy="most_frequent")),
("ohe", OneHotEncoder(drop="first", sparse_output=False, handle_unknown="ignore")),
])
# COMBINE ALL BRANCHES
preprocessor = ColumnTransformer([
("num_std", num_std_pipe, numeric_standard),
("num_skew", num_skew_pipe, numeric_skewed),
("cat", cat_pipe, categorical_low + categorical_nominal),
])
# FULL PIPELINE WITH FEATURE SELECTION + MODEL
full_pipeline = Pipeline([
("preprocessor", preprocessor),
("feature_select", SelectFromModel(RandomForestClassifier(n_estimators=50, random_state=42), threshold="mean")),
("classifier", GradientBoostingClassifier(n_estimators=200, learning_rate=0.05, max_depth=4, random_state=42)),
])
# TRAIN & EVALUATE
full_pipeline.fit(X_train, y_train)
cv_auc = cross_val_score(full_pipeline, X_train, y_train, cv=5, scoring="roc_auc").mean()
test_auc = cross_val_score(full_pipeline, X_test, y_test, cv=None, scoring="roc_auc")
print(f"CV AUC: {cv_auc:.4f}")
print(f"Test AUC: {full_pipeline.score(X_test, y_test):.4f}")
# INSPECT PIPELINE STEPS
print("\nPipeline steps:")
for step_name, step in full_pipeline.steps:
print(f" {step_name}: {type(step).__name__}")
# ACCESS INTERNAL STATE
preprocessor_fitted = full_pipeline.named_steps["preprocessor"]
n_selected = full_pipeline.named_steps["feature_select"].get_support().sum()
print(f"Features selected: {n_selected}")
# SERIALIZE THE WHOLE PIPELINE
joblib.dump(full_pipeline, "credit_pipeline_v1.joblib")
print("Pipeline saved.")
# SINGLE API INFERENCE
loaded = joblib.load("credit_pipeline_v1.joblib")
new_customer = pd.DataFrame([{
"age": 32, "income": 48000, "credit": 640, "loan_amt": 15000,
"education": "bachelor", "employment": "full-time", "region": "north"
}])
default_prob = loaded.predict_proba(new_customer)[0, 1]
print(f"\nNew customer default prob: {default_prob:.1%}")Tip
Tip
Practice Advanced Pipeline Architecture in small, isolated examples before integrating into larger projects. Breaking concepts into small experiments builds genuine understanding faster than reading alone.
80% of ML work is data preparation — garbage in = garbage out
Practice Task
Note
Practice Task — (1) Write a working example of Advanced Pipeline Architecture from scratch without looking at notes. (2) Modify it to handle an edge case (empty input, null value, or error state). (3) Share your solution in the Priygop community for feedback.
Quick Quiz
Common Mistake
Warning
A common mistake with Advanced Pipeline Architecture is skipping edge case testing — empty inputs, null values, and unexpected data types. Always validate boundary conditions to write robust, production-ready ml code.