Source code for co2_emissions_ml.pipeline

import pandas as pd
from co2_emissions_ml.preprocessing import build_preprocessor, build_target_transformer
from co2_emissions_ml.models import fit_cluster_model, predict_bundle
from co2_emissions_ml.evaluation import compute_metrics
from sklearn.model_selection import train_test_split
import argparse, joblib, os


[docs] def run_pipeline( data_path: str, target_col: str = None, ): # 1) Load data try: df = pd.read_csv(data_path) except UnicodeDecodeError: # fall back to latin-1 for files with extended characters print(f"[WARN] UTF-8 decode failed, retrying with latin-1 for {data_path}") df = pd.read_csv(data_path, encoding="latin-1") # If no target_col given, try to auto-detect if target_col is None: # pick the one column containing "CO2" or "Emissions" candidates = [ c for c in df.columns if "co2" in c.lower() or "emiss" in c.lower() ] if not candidates: raise KeyError( "No target column specified and no column matching 'CO2' or 'Emissions' found." ) # use the longest match (to avoid 'Transmission' matching 'transmission') target_col = max(candidates, key=len) print(f"[INFO] Auto-detected target column: '{target_col}'") if target_col not in df.columns: raise KeyError( f"Target column '{target_col}' not found in data. Available columns: {df.columns.tolist()}" ) X = df.drop(columns=[target_col]) y = df[target_col] # 2) Build preprocessing objects pre = build_preprocessor(X) tt = build_target_transformer() # 3) Fit ensemble bundle = fit_cluster_model(X, y, pre, tt) # 4) Evaluate on train & test split X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42) y_pred = predict_bundle(bundle, X_te) # 5) Metrics metrics = compute_metrics(y_te, y_pred) print("Test set performance:") for k, v in metrics.items(): print(f" {k}: {v:.4f}") return bundle, metrics
DEFAULT_MODEL = os.path.join(os.path.dirname(__file__), "..", "models", "bundle.pkl")
[docs] def main(): parser = argparse.ArgumentParser( description="Train & evaluate the CO₂ emissions pipeline" ) parser.add_argument( "--model", dest="model_path", type=str, default=DEFAULT_MODEL, help="Path to pre-trained bundle (for inference).", ) parser.add_argument("--data", dest="data_path", type=str, required=True) parser.add_argument("--target", dest="target_col", type=str, default=None) parser.add_argument("--output", dest="out_csv", type=str, default=None) args = parser.parse_args() # If model_path exists, run in inference‐only mode if os.path.exists(args.model_path) and args.target_col is None: print(f"[INFO] Loading pre-trained bundle from {args.model_path}") bundle = joblib.load(args.model_path) # Pure inference: read data, predict, write out try: df = pd.read_csv(args.data_path) except UnicodeDecodeError: print( f"[WARN] UTF-8 decode failed, retrying with latin-1 for {args.data_path}" ) df = pd.read_csv(args.data_path, encoding="latin-1") X_new = df.copy() preds = predict_bundle(bundle, X_new) df["predicted_CO2"] = preds if args.out_csv: df.to_csv(args.out_csv, index=False) print(f"[INFO] Wrote predictions to {args.out_csv}") else: print(df.head()) return # Otherwise, fall back to full training+evaluation bundle, metrics = run_pipeline(args.data_path, args.target_col) # If user wants raw predictions on every row: if args.out_csv: try: df = pd.read_csv(args.data_path) except UnicodeDecodeError: print( f"[WARN] UTF-8 decode failed, retrying with latin-1 for {args.data_path}" ) df = pd.read_csv(args.data_path, encoding="latin-1") # drop target if present if args.target_col in df.columns: X = df.drop(columns=[args.target_col]) else: X = df df["predicted_CO2"] = predict_bundle(bundle, X) df.to_csv(args.out_csv, index=False) print(f"[INFO] Wrote predictions to {args.out_csv}")
if __name__ == "__main__": main()