diff --git a/notebooks/pdp/01-data-assessment-eda-TEMPLATE.py b/notebooks/pdp/01-data-assessment-eda-TEMPLATE.py index fe1ec743..fdabbd39 100644 --- a/notebooks/pdp/01-data-assessment-eda-TEMPLATE.py +++ b/notebooks/pdp/01-data-assessment-eda-TEMPLATE.py @@ -23,8 +23,11 @@ # COMMAND ---------- -# install dependencies, most of which should come through our 1st-party SST package -# %pip install git+https://github.com/datakind/student-success-tool.git@develop +# install dependencies, most/all of which should come through our 1st-party SST package +# NOTE: it's okay to use 'develop' or a feature branch while developing this nb +# but when it's finished, it's best to pin to a specific version of the package +# %pip install "student-success-tool == 0.1.0" +# %pip install "git+https://github.com/datakind/student-success-tool.git@develop" # COMMAND ---------- @@ -33,7 +36,6 @@ # COMMAND ---------- import logging -import os import sys import matplotlib.pyplot as plt @@ -45,14 +47,15 @@ from databricks.sdk.runtime import dbutils from student_success_tool.analysis import pdp +from student_success_tool import configs # COMMAND ---------- -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.INFO, force=True) logging.getLogger("py4j").setLevel(logging.WARNING) # ignore databricks logger try: - spark_session = DatabricksSession.builder.getOrCreate() + spark = DatabricksSession.builder.getOrCreate() except Exception: logging.warning("unable to create spark session; are you in a Databricks runtime?") pass @@ -60,7 +63,7 @@ # COMMAND ---------- # MAGIC %md -# MAGIC ## `student-success-intervention` hacks +# MAGIC ## import school-specific code # COMMAND ---------- @@ -69,33 +72,21 @@ # COMMAND ---------- -# HACK: insert our 1st-party (school-specific) code into PATH +# insert our 1st-party (school-specific) code into PATH if "../" not in sys.path: sys.path.insert(1, "../") -# TODO: specify school's subpackage +# TODO: specify school's subpackage here from analysis import * # noqa: F403 # COMMAND ---------- -# MAGIC %md -# MAGIC ## unity catalog config - -# COMMAND ---------- - -catalog = "sst_dev" - -# configure where data is to be read from / written to -inst_name = "SCHOOL" # TODO: fill in school's name in Unity Catalog -read_schema = f"{inst_name}_bronze" -write_schema = f"{inst_name}_silver" - -path_volume = os.path.join( - "/Volumes", catalog, read_schema, f"{inst_name}_bronze_file_volume" -) -path_table = f"{catalog}.{read_schema}" -print(f"{path_table=}") -print(f"{path_volume=}") +# project configuration should be stored in a config file in TOML format +# it'll start out with just basic info: institution_id, institution_name +# but as each step of the pipeline gets built, more parameters will be moved +# from hard-coded notebook variables to shareable, persistent config fields +cfg = configs.load_config("./config-v2-TEMPLATE.toml", configs.PDPProjectConfigV2) +cfg # COMMAND ---------- @@ -109,14 +100,17 @@ # COMMAND ---------- -# TODO: fill in school's name; may not be same as in the schemas above -fpath_course = os.path.join(path_volume, "SCHOOL_COURSE_AR_DEID_DTTM.csv") +try: + raw_course_file_path = cfg.datasets.labeled.raw_course.file_path +except AttributeError: + # TODO: fill in the actual path to school's raw course file + raw_course_file_path = "/Volumes/CATALOG/INST_NAME_bronze/INST_NAME_bronze_file_volume/SCHOOL_COURSE_AR_DEID_DTTM.csv" # COMMAND ---------- # read without any schema validation, so we can look at the data "raw" df_course_raw = pdp.dataio.read_raw_pdp_course_data_from_file( - fpath_course, schema=None, dttm_format="%Y%m%d.0" + raw_course_file_path, schema=None, dttm_format="%Y%m%d.0" ) print(f"rows x cols = {df_course_raw.shape}") df_course_raw.head() @@ -127,6 +121,10 @@ # COMMAND ---------- +df_course_raw["course_begin_date"].describe() + +# COMMAND ---------- + # MAGIC %md # MAGIC Quick checks: # MAGIC - [ ] data exists where it should @@ -137,7 +135,7 @@ # try to read data while validating with the "base" PDP schema df_course = pdp.dataio.read_raw_pdp_course_data_from_file( - fpath_course, schema=pdp.schemas.RawPDPCourseDataSchema, dttm_format="%Y%m%d.0" + raw_course_file_path, schema=pdp.schemas.RawPDPCourseDataSchema, dttm_format="%Y%m%d.0" ) df_course @@ -199,7 +197,7 @@ # MAGIC ``` # MAGIC # MAGIC At this point, `df_course` should be a properly validated and parsed data frame, ready for exploratory data analysis. - +# MAGIC # COMMAND ---------- @@ -208,14 +206,18 @@ # COMMAND ---------- - -# TODO: fill in school's name; may not be same as in the schemas above -fpath_cohort = os.path.join(path_volume, "SCHOOL_COHORT_AR_DEID_DTTM.csv") +try: + raw_cohort_file_path = cfg.datasets.labeled.raw_cohort.file_path +except AttributeError: + # TODO: fill in the actual path to school's raw cohort file + raw_cohort_file_path = "/Volumes/CATALOG/INST_NAME_bronze/INST_NAME_bronze_file_volume/SCHOOL_COHORT_AR_DEID_DTTM.csv" # COMMAND ---------- # read without any schema validation, so we can look at the data "raw" -df_cohort_raw = pdp.dataio.read_raw_pdp_cohort_data_from_file(fpath_cohort, schema=None) +df_cohort_raw = pdp.dataio.read_raw_pdp_cohort_data_from_file( + raw_cohort_file_path, schema=None +) print(f"rows x cols = {df_cohort_raw.shape}") df_cohort_raw.head() @@ -223,7 +225,7 @@ # try to read data while validating with the "base" PDP schema df_cohort = pdp.dataio.read_raw_pdp_cohort_data_from_file( - fpath_cohort, schema=pdp.schemas.base.RawPDPCohortDataSchema + raw_cohort_file_path, schema=pdp.schemas.RawPDPCohortDataSchema ) df_cohort @@ -242,22 +244,31 @@ # COMMAND ---------- # MAGIC %md -# MAGIC ## save validated data +# MAGIC ## HEY, STOP HERE! + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Before continuing on to EDA, now's a great time to do a couple things: +# MAGIC +# MAGIC - Copy any school-specific raw dataset schemas into a `schemas.py` file in the current working directory +# MAGIC - Copy any school-specific preprocessing functions needed to coerce the raw data into a standardized form into a `dataio.py` file in the current working directory +# MAGIC - **Optional:** If you want easy access to outputs from every (sub-)step of the data transformation pipeline, save the validated datasets into this school's "silver" schema in Unity Catalog. # COMMAND ---------- pdp.dataio.write_data_to_delta_table( df_course, - f"{catalog}.{write_schema}.course_dataset_validated", - spark_session=spark_session, + "CATALOG.INST_NAME_silver.course_dataset_validated", + spark_session=spark, ) # COMMAND ---------- pdp.dataio.write_data_to_delta_table( df_cohort, - f"{catalog}.{write_schema}.cohort_dataset_validated", - spark_session=spark_session, + "CATALOG.INST_NAME_silver.cohort_dataset_validated", + spark_session=spark, ) # COMMAND ---------- @@ -269,17 +280,17 @@ # MAGIC %md # MAGIC %md -# MAGIC ## read validated data +# MAGIC ## read validated data? # MAGIC -# MAGIC (so you don't have to execute the validation process more than once) +# MAGIC (optional, so you don't have to execute the validation process more than once) # COMMAND ---------- # use base or school-specific schema, as needed df_course = pdp.schemas.RawPDPCourseDataSchema( pdp.dataio.read_data_from_delta_table( - f"{catalog}.{write_schema}.course_dataset_validated", - spark_session=spark_session, + "CATALOG.INST_NAME_silver.course_dataset_validated", + spark_session=spark, ) ) df_course.shape @@ -288,8 +299,8 @@ df_cohort = pdp.schemas.RawCohortDataSchema( pdp.dataio.read_data_from_delta_table( - f"{catalog}.{write_schema}.cohort_dataset_validated", - spark_session=spark_session, + "CATALOG.INST_NAME_silver.cohort_dataset_validated", + spark_session=spark, ) ) df_cohort.shape @@ -307,8 +318,11 @@ # COMMAND ---------- # specific follow-ups, for example +# df_course["academic_year"].value_counts(normalize=True, dropna=False) +# df_course["academic_term"].value_counts(normalize=True, dropna=False) # df_course["grade"].value_counts(normalize=True, dropna=False) # df_course["delivery_method"].value_counts(normalize=True, dropna=False) +# df_course["course_name"].value_counts(normalize=True, dropna=False).head(10) # COMMAND ---------- @@ -317,8 +331,8 @@ # COMMAND ---------- # specific follow-ups, for example -# df_course["cohort"].value_counts(normalize=True, dropna=False) -# df_course["enrollment_type"].value_counts(normalize=True, dropna=False) +# df_cohort["cohort"].value_counts(normalize=True, dropna=False) +# df_cohort["enrollment_type"].value_counts(normalize=True, dropna=False) # COMMAND ---------- @@ -509,6 +523,10 @@ # COMMAND ---------- +df_pre_cohort["enrollment_type"].value_counts() + +# COMMAND ---------- + # MAGIC %md # MAGIC ### filter invalid rows(?) @@ -574,6 +592,7 @@ ax = sb.histplot( df_course.sort_values(by="academic_year"), + # df_course_valid.sort_values(by="academic_year"), y="academic_year", hue="academic_term", multiple="stack", @@ -645,6 +664,7 @@ ax = sb.histplot( pd.merge( df_course.groupby("student_guid") + # df_course_valid.groupby("student_guid") .size() .rename("num_courses_enrolled") .reset_index(drop=False), @@ -667,6 +687,9 @@ df_course.groupby("student_guid").agg( {"number_of_credits_attempted": "sum", "number_of_credits_earned": "sum"} ), + # df_course_valid.groupby("student_guid").agg( + # {"number_of_credits_attempted": "sum", "number_of_credits_earned": "sum"} + # ), x="number_of_credits_attempted", y="number_of_credits_earned", kind="hex", @@ -769,7 +792,10 @@ # COMMAND ---------- # MAGIC %md -# MAGIC - [ ] Add school-specific data schemas and/or preprocessing functions into the appropriate directory in the [`student-success-intervention` repository](https://github.com/datakind/student-success-intervention) -# MAGIC - ... +# MAGIC - [ ] If you haven't already, add school-specific data schemas and/or preprocessing functions into the appropriate directory in the [`student-success-intervention` repository](https://github.com/datakind/student-success-intervention) +# MAGIC - [ ] Add file paths for the raw course/cohort datasets to the project config file's `datasets.labeled.raw_course` and `datasets.labeled.raw_cohort` blocks +# MAGIC - [ ] Submit a PR including this notebook and any school-specific files added in order to run it # COMMAND ---------- + + diff --git a/notebooks/pdp/02-prepare-modeling-dataset-TEMPLATE.py b/notebooks/pdp/02-prepare-modeling-dataset-TEMPLATE.py index 95fa5805..ac3a0f0d 100644 --- a/notebooks/pdp/02-prepare-modeling-dataset-TEMPLATE.py +++ b/notebooks/pdp/02-prepare-modeling-dataset-TEMPLATE.py @@ -22,8 +22,11 @@ # COMMAND ---------- -# install dependencies, most of which should come through our 1st-party SST package -# %pip install git+https://github.com/datakind/student-success-tool.git@develop +# install dependencies, most/all of which should come through our 1st-party SST package +# NOTE: it's okay to use 'develop' or a feature branch while developing this nb +# but when it's finished, it's best to pin to a specific version of the package +# %pip install "student-success-tool == 0.1.0" +# %pip install "git+https://github.com/datakind/student-success-tool.git@develop" # COMMAND ---------- @@ -56,7 +59,7 @@ # COMMAND ---------- # MAGIC %md -# MAGIC ## `student-success-intervention` hacks +# MAGIC ## import school-specific code # COMMAND ---------- @@ -65,7 +68,7 @@ # COMMAND ---------- -# HACK: insert our 1st-party (school-specific) code into PATH +# insert our 1st-party (school-specific) code into PATH if "../" not in sys.path: sys.path.insert(1, "../") @@ -74,24 +77,12 @@ # COMMAND ---------- -# MAGIC %md -# MAGIC ## project config - -# COMMAND ---------- - -# TODO: create a config file in TOML format to school directory -config = configs.load_config("./config.toml", schema=configs.PDPProjectConfig) -config - -# COMMAND ---------- - -catalog = "sst_dev" - -# configure where data is to be read from / written to -inst_name = "SCHOOL" # TODO: fill in school's name in Unity Catalog -schema = f"{inst_name}_silver" -catalog_schema = f"{catalog}.{schema}" -print(f"{catalog_schema=}") +# project configuration should be stored in a config file in TOML format +# it'll start out with just basic info: institution_id, institution_name +# but as each step of the pipeline gets built, more parameters will be moved +# from hard-coded notebook variables to shareable, persistent config fields +cfg = configs.load_config("./config-v2-TEMPLATE.toml", configs.PDPProjectConfigV2) +cfg # COMMAND ---------- @@ -130,6 +121,20 @@ # COMMAND ---------- +try: + feature_params = cfg.preprocessing.features.model_dump() +except AttributeError: + feature_params = { + "min_passing_grade": pdp.constants.DEFAULT_MIN_PASSING_GRADE, + "min_num_credits_full_time": pdp.constants.DEFAULT_MIN_NUM_CREDITS_FULL_TIME, + "course_level_pattern": pdp.constants.DEFAULT_COURSE_LEVEL_PATTERN, + "peak_covid_terms": pdp.constants.DEFAULT_PEAK_COVID_TERMS, + "key_course_subject_areas": None, + "key_course_ids": None, + } + +# COMMAND ---------- + dict(config.prepare_modeling_dataset) # COMMAND ---------- diff --git a/notebooks/pdp/04-make-explain-predictions-TEMPLATE.py b/notebooks/pdp/04-make-explain-predictions-TEMPLATE.py new file mode 100644 index 00000000..fd0c29cc --- /dev/null +++ b/notebooks/pdp/04-make-explain-predictions-TEMPLATE.py @@ -0,0 +1,325 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # SST Make and Explain Predictions: [SCHOOL] +# MAGIC +# MAGIC Fourth step in the process of transforming raw (PDP) data into actionable, data-driven insights for advisors: generate predictions and feature importances for new (unlabeled) data. +# MAGIC +# MAGIC ### References +# MAGIC +# MAGIC - [Data science product components (Confluence doc)](https://datakind.atlassian.net/wiki/spaces/TT/pages/237862913/Data+science+product+components+the+modeling+process) +# MAGIC - [Databricks runtimes release notes](https://docs.databricks.com/en/release-notes/runtime/index.html) +# MAGIC - [SCHOOL WEBSITE](https://example.com) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # setup + +# COMMAND ---------- + +# MAGIC %sh python --version + +# COMMAND ---------- + +# install dependencies, most of which should come through our 1st-party SST package +# %pip install "student-success-tool==0.1.0" --no-deps +# %pip install git+https://github.com/datakind/student-success-tool.git@develop --no-deps +# %pip install pandera + +# COMMAND ---------- + +# MAGIC %restart_python + +# COMMAND ---------- + +import functools as ft +import logging +import typing as t + +import mlflow +import numpy as np +import pandas as pd +import shap +import sklearn.inspection +import sklearn.metrics +from databricks.connect import DatabricksSession + +# from databricks.sdk.runtime import dbutils +# from py4j.protocol import Py4JJavaError +# from pyspark import SparkContext +# from pyspark.sql import SparkSession +from pyspark.sql.types import FloatType, StringType, StructField, StructType + +from student_success_tool.analysis.pdp import dataio +from student_success_tool.modeling import inference, utils + +# COMMAND ---------- + +logging.getLogger("root").setLevel(logging.INFO) +logging.getLogger("py4j").setLevel(logging.WARNING) # ignore databricks logger + +try: + spark_session = DatabricksSession.builder.getOrCreate() +except Exception: + logging.warning("unable to create spark session; are you in a Databricks runtime?") + pass + +# COMMAND ---------- + +# Databricks logs every instance that uses sklearn or other modelling libraries +# to MLFlow experiments... which we don't want +mlflow.autolog(disable=True) +mlflow.sklearn.autolog(disable=True) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## configuration + +# COMMAND ---------- + +# TODO TODO TODO: use project config + +train_sample_size = 100 +validate_sample_size = 100 + +institution_id = "INSTITUTION_ID" +best_model_run_id = "BEST_MODEL_RUN_ID" +student_id_col = "student_guid" +target_col = "target" +split_col = "split" +sample_weight_col = "sample_weight" +pos_label = True + +model_type = "sklearn" +labeled_data_path = "CATALOG.SCHEMA.TABLE_NAME" # "TODO" +unlabeled_data_path = None + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # load model and data + +# COMMAND ---------- + + +# TODO: move this into sst package +def mlflow_load_model(model_uri: str, model_type: str): + load_model_func = ( + mlflow.sklearn.load_model + if model_type == "sklearn" + else mlflow.xgboost.load_model + if model_type == "xgboost" + else mlflow.lightgbm.load_model + if model_type == "lightgbm" + else mlflow.pyfunc.load_model + ) + model = load_model_func(f"runs:/{best_model_run_id}/model") + logging.info("mlflow '%s' model loaded from '%s'", model_type, model_uri) + return model + + +def predict_proba( + df: pd.DataFrame, *, model, pos_label: bool | str = True +) -> pd.Series: + return pd.Series( + model.predict_proba(df)[:, model.classes_.tolist().index(pos_label)] + ) + + +# COMMAND ---------- + +model = mlflow_load_model(f"runs:/{best_model_run_id}/model", model_type) +model_features = model.named_steps["column_selector"].get_params()["cols"] +logging.info( + "model uses %s features: %s", len(model_features), ", ".join(model_features) +) + +# COMMAND ---------- + +model_features = model.named_steps["column_selector"].get_params()["cols"] +print(len(model_features)) + +# COMMAND ---------- + +df_labeled = dataio.read_data_from_delta_table( + labeled_data_path, spark_session=spark_session +) +print(df_labeled.shape) +df_labeled.head() + +# COMMAND ---------- + +if unlabeled_data_path: + df_unlabeled = dataio.read_data_from_delta_table( + unlabeled_data_path, spark_session=spark_session + ) +else: + df_unlabeled = df_labeled.loc[df_labeled[split_col].eq("test"), :].drop( + columns=target_col + ) +print(df_unlabeled.shape) +df_unlabeled.head() + +# COMMAND ---------- + +pred_probs = predict_proba(df_unlabeled, model=model) +pred_probs.describe() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC # initialize SHAP explainer + +# COMMAND ---------- + +df_train = df_labeled.loc[df_labeled[split_col].eq("train"), :] +# SHAP can't explain models using data with nulls +# so, impute nulls using the mode (most frequent values) +mode = df_train.mode().iloc[0] +# sample background data for SHAP Explainer +train_sample = ( + df_train.sample(n=min(train_sample_size, df_train.shape[0]), random_state=1) + .fillna(mode) + .loc[:, model_features] +) +train_sample + +# COMMAND ---------- + + +def predict_proba_v3( + X, + *, + model, + col_names: t.Optional[list[str]] = None, + pos_label: t.Optional[bool | str] = None, +) -> np.ndarray: + if col_names is None: + col_names = model.named_steps["column_selector"].get_params()["cols"] + pred_probs = model.predict_proba(pd.DataFrame(data=X, columns=col_names)) + if pos_label is not None: + return pred_probs[:, model.classes_.tolist().index(pos_label)] + else: + return pred_probs + + +def predict_proba_v2(X, *, model, pos_label: bool | str = True): + model_features = model.named_steps["column_selector"].get_params()["cols"] + pred_probs = model.predict_proba(pd.DataFrame(data=X, columns=model_features)) + return pred_probs[:, model.classes_.tolist().index(pos_label)] + + +# COMMAND ---------- + +# import shap +# import sklearn + +# X, y = shap.datasets.adult() +# m = sklearn.linear_model.LogisticRegression().fit(X, y) +# explainer = shap.explainers.Permutation(m.predict_proba, X) +# shap_values = explainer(X[:100]) +# shap.plots.bar(shap_values[..., 1]) + +# COMMAND ---------- + +# explainer = shap.explainers.Permutation(model.predict_proba, train_sample) +# explainer = shap.explainers.KernelExplainer(model.predict_proba, train_sample) +explainer = shap.explainers.KernelExplainer( + ft.partial( + predict_proba_v3, model=model, col_names=model_features, pos_label=pos_label + ), + train_sample, + link="identity", +) +explainer + +# COMMAND ---------- + +shap_schema = StructType( + [StructField(student_id_col, StringType(), nullable=False)] + + [StructField(col, FloatType(), nullable=False) for col in model_features] +) + +df_shap_values = ( + spark.createDataFrame(df_unlabeled.drop(columns=[split_col, sample_weight_col])) # noqa: F821 + .repartition(sc.defaultParallelism) # noqa: F821 + .mapInPandas( + ft.partial( + inference.calculate_shap_values_spark_udf, + student_id_col=student_id_col, + model_features=model_features, + explainer=explainer, + mode=mode, + ), + schema=shap_schema, + ) + .toPandas() + .set_index(student_id_col) + .reindex(df_unlabeled[student_id_col]) + .reset_index(drop=False) +) +df_shap_values + +# COMMAND ---------- + +shap.summary_plot( + df_shap_values[model_features].to_numpy(), + df_unlabeled[model_features], + class_names=model.classes_, + # show=False, ??? +) + +# COMMAND ---------- + +features_table = utils.load_features_table("assets/pdp/features_table.toml") +result = inference.select_top_features_for_display( + df_unlabeled.loc[:, model_features], + df_unlabeled[student_id_col], + pred_probs, + df_shap_values[model_features].to_numpy(), + n_features=5, + features_table=features_table, + needs_support_threshold_prob=0.5, +) +result + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## TODO: +# MAGIC +# MAGIC - save plots and results in a nice form in a place that makes sense + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## haxx + +# COMMAND ---------- + + +result = sklearn.inspection.permutation_importance( + model, + train_sample.drop(columns=target_col), + train_sample[target_col], + scoring=sklearn.metrics.make_scorer( + sklearn.metrics.log_loss, greater_is_better=False + ), + n_repeats=10, +) + +# COMMAND ---------- + +sorted_importances_idx = result.importances_mean.argsort() +importances = pd.DataFrame( + result.importances[sorted_importances_idx].T, + columns=train_sample.columns[sorted_importances_idx], +) +ax = importances.plot.box(vert=False, whis=10, figsize=(10, 10)) +ax.set_title("Permutation Importances (test set)") +ax.axvline(x=0, color="k", linestyle="--") +ax.set_xlabel("Decrease in accuracy score") +ax.figure.tight_layout() + +# COMMAND ---------- diff --git a/notebooks/pdp/config-v2-TEMPLATE.toml b/notebooks/pdp/config-v2-TEMPLATE.toml new file mode 100644 index 00000000..f74c2a25 --- /dev/null +++ b/notebooks/pdp/config-v2-TEMPLATE.toml @@ -0,0 +1,59 @@ +institution_id = "INST_ID" +institution_name = "INST_NAME" + +student_id_col = "student_guid" +target_col = "target" +split_col = "split" +sample_weight_col = "sample_weight" +student_group_cols = ["student_age", "race", "ethnicity", "gender", "first_gen"] +pred_col = "pred" +pred_prob_col = "pred_prob" +pos_label = true +random_state = 12345 + +[datasets.labeled.raw_course] +file_path = "/Volumes/CATALOG/INST_NAME_bronze/INST_NAME_bronze_file_volume/FILE_NAME_COURSE.csv" + +[datasets.labeled.raw_cohort] +file_path = "/Volumes/CATALOG/INST_NAME_bronze/INST_NAME_bronze_file_volume/FILE_NAME_COHORT.csv" + +[preprocessing] +splits = { train = 0.6, test = 0.2, validate = 0.2 } +sample_class_weight = "balanced" + +[preprocessing.features] +min_passing_grade = 1.0 +min_num_credits_full_time = 12 +course_level_pattern = 'asdf' +key_course_subject_areas = ["24", "51"] +key_course_ids = ["ENGL101", "MATH101"] + +[preprocessing.target.params] +min_num_credits_checkin = 30.0 +min_num_credits_target = 60.0 + +[preprocessing.target.student_criteria] +enrollment_type = "FIRST-TIME" +credential_type_sought_year_1 = "Bachelor's Degree" + +[labeled_dataset.preprocessed] +table_path = "CATALOG.SCHEMA.TABLE_NAME" + +[modeling.feature_selection] +incomplete_threshold = 0.5 +low_variance_threshold = 0.0 +collinear_threshold = 10.0 + +[modeling.training] +# exclude_frameworks = ["xgboost", "lightgbm"] +primary_metric = "log_loss" +timeout_minutes = 10 + +[trained_model] +experiment_id = "EXPERIMENT_ID" +run_id = "RUN_ID" +# model_type = "sklearn" +min_prob_pos_label = 0.5 + +[inference] +num_top_features = 5 diff --git a/src/student_success_tool/analysis/pdp/schemas/raw_cohort.py b/src/student_success_tool/analysis/pdp/schemas/raw_cohort.py index 518731d5..6b4c3455 100644 --- a/src/student_success_tool/analysis/pdp/schemas/raw_cohort.py +++ b/src/student_success_tool/analysis/pdp/schemas/raw_cohort.py @@ -74,7 +74,9 @@ class RawPDPCohortDataSchema(pda.DataFrameModel): dtype_kwargs={"categories": ["FIRST-TIME", "RE-ADMIT", "TRANSFER-IN"]}, ) # NOTE: categories set in a parser, which forces "UK" / "UNKNOWN" values to null - enrollment_intensity_first_term: pt.Series[pd.CategoricalDtype] = pda.Field() + enrollment_intensity_first_term: pt.Series[pd.CategoricalDtype] = pda.Field( + nullable=True + ) # NOTE: categories set in a parser, which forces "UK" values to null math_placement: pt.Series[pd.CategoricalDtype] = pda.Field(nullable=True) # NOTE: categories set in a parser, which forces "UK" values to null diff --git a/src/student_success_tool/configs/__init__.py b/src/student_success_tool/configs/__init__.py index 1acfd3fd..88bc3e61 100644 --- a/src/student_success_tool/configs/__init__.py +++ b/src/student_success_tool/configs/__init__.py @@ -1,2 +1,3 @@ from .load import load_config from .schemas.pdp import PDPProjectConfig +from .schemas.pdp_v2 import PDPProjectConfigV2 diff --git a/src/student_success_tool/configs/schemas/pdp_v2.py b/src/student_success_tool/configs/schemas/pdp_v2.py new file mode 100644 index 00000000..45b21aad --- /dev/null +++ b/src/student_success_tool/configs/schemas/pdp_v2.py @@ -0,0 +1,231 @@ +import typing as t + +import pydantic as pyd + +from ...analysis.pdp import constants + + +class FeaturesConfig(pyd.BaseModel): + min_passing_grade: float = pyd.Field( + default=constants.DEFAULT_MIN_PASSING_GRADE, + description="Minimum numeric grade considered by institution as 'passing'", + gt=0.0, + lt=4.0, + ) + min_num_credits_full_time: float = pyd.Field( + default=constants.DEFAULT_MIN_NUM_CREDITS_FULL_TIME, + description=( + "Minimum number of credits *attempted* per term for a student's " + "enrollment intensity to be considered 'full-time'." + ), + gt=0.0, + lt=20.0, + ) + course_level_pattern: str = pyd.Field( + default=constants.DEFAULT_COURSE_LEVEL_PATTERN, + description=( + "Regular expression patttern that extracts a course's 'level' " + "from a PDP course_number field" + ), + ) + peak_covid_terms: set[tuple[str, str]] = pyd.Field( + default=constants.DEFAULT_PEAK_COVID_TERMS, + description=( + "Set of (academic year, academic term) pairs considered by institution " + "as 'peak' COVID, for use in control variables to account for pandemic effects" + ), + ) + key_course_subject_areas: t.Optional[list[str]] = pyd.Field( + default=None, + description=( + "One or more course subject areas (formatted as 2-digit CIP codes) " + "for which custom features should be computed" + ), + ) + key_course_ids: t.Optional[list[str]] = pyd.Field( + default=None, + description=( + "One or more course ids (formatted as '[COURSE_PREFIX][COURSE_NUMBER]') " + "for which custom features should be computed" + ), + ) + + +class TargetConfig(pyd.BaseModel): + student_criteria: dict[str, object] = pyd.Field( + default_factory=dict, + description=( + "Column name in modeling dataset mapped to one or more values that it must equal " + "in order for the corresponding student to be considered 'eligible'. " + "Multiple criteria are combined with a logical 'AND'." + ), + ) + # TODO: refine target functionality and expand on this configuration + params: dict[str, object] = pyd.Field(default_factory=dict) + + +class PreprocessingConfig(pyd.BaseModel): + features: FeaturesConfig + target: TargetConfig + splits: dict[t.Literal["train", "test", "validate"], float] = pyd.Field( + default={"train": 0.6, "test": 0.2, "validate": 0.2}, + description=( + "Mapping of name to fraction of the full datset belonging to a given 'split', " + "which is a randomized subset used for different parts of the modeling process" + ), + ) + sample_class_weight: t.Optional[t.Literal["balanced"] | dict[object, int]] = ( + pyd.Field( + default=None, + description=( + "Weights associated with classes in the form ``{class_label: weight}`` " + "or 'balanced' to automatically adjust weights inversely proportional " + "to class frequencies in the input data. " + "If null (default), then sample weights are not computed." + ), + ) + ) + + @pyd.field_validator("splits", mode="after") + @classmethod + def check_split_fractions(cls, value: dict) -> dict: + if (sum_fracs := sum(value.values())) != 1.0: + raise pyd.ValidationError( + f"split fractions must sum up to 1.0, but input sums up to {sum_fracs}" + ) + return value + + +class FeatureSelectionConfig(pyd.BaseModel): + """ + See Also: + - :func:`modeling.feature_selection.select_features()` + """ + + non_feature_cols: t.Optional[list[str]] = None + force_include_cols: t.Optional[list[str]] = None + incomplete_threshold: float = 0.5 + low_variance_threshold: float = 0.0 + collinear_threshold: t.Optional[float] = 10.0 + + +class TrainingConfig(pyd.BaseModel): + """ + References: + - https://docs.databricks.com/en/machine-learning/automl/automl-api-reference.html#classify + """ + + exclude_cols: t.Optional[list[str]] = pyd.Field( + default=None, + description="One or more column names in dataset to exclude from training.", + ) + time_col: t.Optional[str] = pyd.Field( + default=None, + description=( + "Column name in dataset used to split train/test/validate sets chronologically, " + "as an alternative to the randomized assignment in ``split_col`` ." + ), + ) + exclude_frameworks: t.Optional[list[str]] = pyd.Field( + default=None, + description="List of algorithm frameworks that AutoML excludes from training.", + ) + primary_metric: str = pyd.Field( + default="log_loss", + description="Metric used to evaluate and rank model performance.", + ) + timeout_minutes: t.Optional[int] = pyd.Field( + default=None, + description="Maximum time to wait for AutoML trials to complete.", + ) + + +class ModelingConfig(pyd.BaseModel): + feature_selection: t.Optional[FeatureSelectionConfig] = None + training: TrainingConfig + + +class InferenceConfig(pyd.BaseModel): + num_top_features: int = pyd.Field(default=5) + # TODO: extend this configuration, maybe? + + +class DatasetIOConfig(pyd.BaseModel): + table_path: t.Optional[str] = pyd.Field( + default=None, + description=( + "Path to a table in Unity Catalog where dataset is stored, " + "including the full three-level namespace: 'CATALOG.SCHEMA.TABLE'" + ), + ) + file_path: t.Optional[str] = pyd.Field( + default=None, + description="Full, absolute path to dataset on disk, e.g. a Databricks Volume", + ) + # TODO: if/when we allow different file formats, add this parameter ... + # file_format: t.Optional[t.Literal["csv", "parquet"]] = pyd.Field(default=None) + + @pyd.model_validator(mode="after") + def check_some_nonnull_inputs(self): + if self.table_path is None and self.file_path is None: + raise pyd.ValidationError("table_path and/or file_path must be non-null") + return self + + +class DatasetConfig(pyd.BaseModel): + raw_course: DatasetIOConfig + raw_cohort: DatasetIOConfig + preprocessed: t.Optional[DatasetIOConfig] = None + predictions: t.Optional[DatasetIOConfig] = None + + +class DatasetsConfig(pyd.BaseModel): + labeled: DatasetConfig + unlabeled: t.Optional[DatasetConfig] = None + + +class TrainedModelConfig(pyd.BaseModel): + experiment_id: str + run_id: str + model_type: t.Optional[t.Literal["sklearn", "xgboost", "lightgbm"]] = None + min_prob_pos_label: t.Optional[float] = 0.5 + + @pyd.computed_field # type: ignore[misc] + @property + def mlflow_model_uri(self) -> str: + return f"runs:/{self.run_id}/model" + + +class PDPProjectConfigV2(pyd.BaseModel): + """Configuration (v2) schema for PDP SST projects.""" + + institution_id: str + institution_name: str + + # shared dataset parameters + student_id_col: str = "student_guid" + target_col: str = "target" + split_col: str = "split" + sample_weight_col: t.Optional[str] = None + student_group_cols: t.Optional[list[str]] = pyd.Field( + default=["student_age", "race", "ethnicity", "gender", "first_gen"], + description=( + "One or more column names in datasets containing student 'groups' " + "to use for model bias assessment, but *not* as model features" + ), + ) + pred_col: str = "pred" + pred_prob_col: str = "pred_prob" + pos_label: t.Optional[int | bool | str] = True + # other shared parameters + random_state: t.Optional[int] = None + + datasets: t.Optional[DatasetsConfig] = None + trained_model: t.Optional[TrainedModelConfig] = None + + preprocessing: t.Optional[PreprocessingConfig] = None + modeling: t.Optional[ModelingConfig] = None + inference: t.Optional[InferenceConfig] = None + + # NOTE: this is for *pydantic* model -- not ML model -- configuration + model_config = pyd.ConfigDict(extra="ignore", strict=True) diff --git a/src/student_success_tool/modeling/utils.py b/src/student_success_tool/modeling/utils.py index c626bbb6..88fe1aad 100644 --- a/src/student_success_tool/modeling/utils.py +++ b/src/student_success_tool/modeling/utils.py @@ -3,6 +3,7 @@ import typing as t from collections.abc import Sequence +import mlflow import numpy as np import pandas as pd import sklearn.utils @@ -105,3 +106,32 @@ def load_features_table(rel_fpath: str) -> dict[str, dict[str, str]]: LOGGER.info("loaded features table from '%s'", file_path) assert isinstance(features_table, dict) # type guard return features_table + + +def load_mlflow_model( + model_uri: str, + model_type: t.Optional[t.Literal["sklearn", "xgboost", "lightgbm"]] = None, +) -> object: + """ + Load a (registered) MLFlow model of whichever model type from a specified URI. + + Args: + model_uri + model_type + + References: + - https://mlflow.org/docs/latest/python_api/mlflow.sklearn.html#mlflow.sklearn.load_model + - https://mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#mlflow.pyfunc.load_model + """ + load_model_func = ( + mlflow.sklearn.load_model + if model_type == "sklearn" + else mlflow.xgboost.load_model + if model_type == "xgboost" + else mlflow.lightgbm.load_model + if model_type == "lightgbm" + else mlflow.pyfunc.load_model + ) + model = load_model_func(model_uri) + LOGGER.info("mlflow model loaded from '%s'", model_uri) + return model