Source code for gamdist.gamdist

# Copyright 2017 Match Group, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the License for the specific language governing
# permissions and limitations under the License.
#
# Passing untrusted user input may have unintended consequences. Not
# designed to consume input from unknown sources (i.e., the public
# internet).
#
# This file has been modified from the original release by Match Group
# LLC. A description of changes may be found in the change log
# accompanying this source code.

"""Single-response GAM and the ADMM orchestration loop.

This module contains :class:`GAM` -- the user-facing model -- together
with the family / link tables and the ADMM driver that ties per-feature
primal steps to the per-outcome proximal step.

The public entry points are:

* :class:`GAM` -- configure with a family and link, register features
  with :meth:`GAM.add_feature`, fit with :meth:`GAM.fit`, then call
  :meth:`GAM.predict`, :meth:`GAM.deviance`, :meth:`GAM.aic`,
  :meth:`GAM.summary`, etc.
* :func:`fit_adaptive_lasso` -- two-stage adaptive-lasso wrapper that
  refits with reweighted L1 penalties.

The algorithm follows Chu, Keshavarz, & Boyd's distributed-fitting
paper (citation key ``GAMADMM`` in :class:`GAM`'s docstring); the
convexity-only design rule is captured in CLAUDE.md.
"""

from __future__ import annotations

import os
import pickle
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Literal

import numpy as np
import numpy.typing as npt
import pandas as pd
import scipy.linalg as linalg
import scipy.optimize as optimize
import scipy.special as special
import scipy.stats as stats

from . import proximal_operators as po
from .categorical_feature import _CategoricalFeature
from .feature import _Feature
from .linear_feature import _LinearFeature
from .spline_feature import _SplineFeature

FloatArray = npt.NDArray[np.float64]

Family = Literal[
    "normal",
    "binomial",
    "poisson",
    "gamma",
    "exponential",
    "inverse_gaussian",
    "quasi_binomial",
    "quasi_poisson",
    "quantile",
    "huber",
]
Link = Literal[
    "identity",
    "logistic",
    "probit",
    "complementary_log_log",
    "log",
    "reciprocal",
    "reciprocal_squared",
]
FeatureType = Literal["categorical", "linear", "spline"]

# Open work and historical "done" list now live in the GitHub issue
# tracker and changelog.txt respectively; see
# https://github.com/rwilson4/gamdist/issues for the current roadmap.

FAMILIES = [
    "normal",
    "binomial",
    "poisson",
    "gamma",
    "exponential",
    "inverse_gaussian",
    "quasi_binomial",
    "quasi_poisson",
    "quantile",
    "huber",
]

LINKS = [
    "identity",
    "logistic",
    "probit",
    "complementary_log_log",
    "log",
    "reciprocal",
    "reciprocal_squared",
]

FAMILIES_WITH_KNOWN_DISPERSIONS = {
    "binomial": 1,
    "poisson": 1,
    "quantile": 1,
    "huber": 1,
}

CANONICAL_LINKS = {
    "normal": "identity",
    "binomial": "logistic",
    "poisson": "log",
    "gamma": "reciprocal",
    "inverse_gaussian": "reciprocal_squared",
    "quasi_binomial": "logistic",
    "quasi_poisson": "log",
    "quantile": "identity",
    "huber": "identity",
}

# Quasi-likelihood families share the score function (and therefore the
# per-observation prox, deviance, and variance function) with a "base"
# full-likelihood family; only the dispersion parameter differs.
# ``_base_family`` is what the per-observation kernels dispatch on, while
# ``_family`` keeps the user-facing name so ``dispersion()`` knows
# whether to estimate phi by Pearson chi-squared / (n - p) instead of
# fixing it at one.
QUASI_BASE_FAMILY = {
    "quasi_binomial": "binomial",
    "quasi_poisson": "poisson",
}

# (family, link) pairs that have a real, convex proximal-operator
# implementation. Convexity of every per-component subproblem is the
# North Star (see CLAUDE.md): ``GAM`` rejects any other combination at
# construction time so users find out before ``fit()`` runs. Any new
# entry here needs (a) a proof that the prox subproblem is convex and
# (b) a dedicated solver in ``proximal_operators.py`` -- the previous
# silent ``scipy.optimize.minimize_scalar`` fallback is gone.
SUPPORTED_FAMILY_LINK_PAIRS: frozenset[tuple[str, str]] = frozenset(
    CANONICAL_LINKS.items()
)


def _plot_convergence(
    prim_res: list[float],
    prim_tol: list[float],
    dual_res: list[float],
    dual_tol: list[float],
    dev: list[float],
) -> None:
    """Plot ADMM convergence progress.

    Convergence is declared when the primal and dual residuals fall
    below tolerances computed from the data as in [ADMM]_. Some
    analysts prefer to declare convergence based on changes to the
    deviance, so this is also plotted -- specifically
    :math:`\\log(\\mathrm{dev} - \\mathrm{dev}_\\mathrm{final})`,
    with a small constant added to avoid :math:`\\log 0`.

    Parameters
    ----------
    prim_res : array-like
        Primal residuals after each iteration.
    prim_tol : array-like
        Primal tolerances after each iteration.
    dual_res : array-like
        Dual residuals after each iteration.
    dual_tol : array-like
        Dual tolerances after each iteration.
    dev : array-like
        Deviances after each iteration.
    """
    import matplotlib.pyplot as plt

    dev_arr = np.asarray(dev, dtype=float)
    fig = plt.figure(figsize=(12.0, 10.0))

    ax = fig.add_subplot(211)
    ax.plot(range(len(prim_res)), prim_res, "b-", label="Primal Residual")
    ax.plot(range(len(prim_tol)), prim_tol, "b--", label="Primal Tolerance")
    ax.plot(range(len(dual_res)), dual_res, "r-", label="Dual Residual")
    ax.plot(range(len(dual_tol)), dual_tol, "r--", label="Dual Tolerance")
    ax.set_yscale("log")
    plt.xlabel("Iteration", fontsize=24)
    plt.ylabel("Residual", fontsize=24)
    plt.legend(fontsize=24, loc=3)

    ax = fig.add_subplot(212)
    ax.plot(
        range(len(dev_arr)), (dev_arr - dev_arr[-1]) + 1e-10, "b-", label="Deviance"
    )
    ax.set_yscale("log")
    plt.xlabel("Iteration", fontsize=24)
    plt.ylabel("Deviance Suboptimality", fontsize=24)

    plt.gcf().subplots_adjust(bottom=0.1)
    plt.gcf().subplots_adjust(left=0.1)
    if os.environ.get("MPLBACKEND", "") not in {"Agg", "agg"}:
        plt.show()  # pragma: no cover
    plt.close(fig)


def _gamma_dispersion(dof: float, dev: float, num_obs: int) -> float:
    """Gamma dispersion.

    Solves the gamma likelihood equation
        2 * num_obs * (log nu - psi(nu)) - dof / nu = dev
    for nu. The left-hand side minus the right-hand side is monotonically
    decreasing in nu and crosses zero exactly once for valid inputs
    (``dev > 0`` and ``2 * num_obs > dof``), so Brent's method on a wide
    bracket converges reliably.

    The previous damped-Newton implementation used a fixed step of 0.1
    and tol=1e-6 and ran out of iterations on perfectly reasonable
    inputs (e.g. ``dof=3, dev=10, num_obs=50``).

    Parameters
    ----------
    dof : float
        Effective degrees of freedom of the fitted model.
    dev : float
        Total deviance.
    num_obs : int
        Number of observations.

    Returns
    -------
    nu : float
        The shape-parameter MLE. (The function name is a misnomer kept
        for backwards compatibility -- callers expect the shape, which
        is the inverse of the dispersion phi.)

    Raises
    ------
    ValueError
        When the equation has no positive root for the given inputs.
    """
    if dev <= 0.0:
        raise ValueError("Could not estimate gamma dispersion: non-positive deviance.")

    def f(nu: float) -> float:
        return 2.0 * num_obs * (np.log(nu) - special.psi(nu)) - dof / nu - dev

    lo, hi = 1e-8, 1e8
    if f(lo) * f(hi) > 0:
        raise ValueError(
            "Could not estimate gamma dispersion: no sign change in bracket."
        )
    return float(optimize.brentq(f, lo, hi, xtol=1e-10, rtol=1e-10))


[docs] class GAM: """Generalized Additive Model fit via ADMM. Configure with a family and link, register features with :meth:`add_feature`, fit with :meth:`fit`, then call :meth:`predict`, :meth:`deviance`, :meth:`aic`, :meth:`summary`, etc. Parameters ---------- family : str, optional Family of the model. One of: * ``"normal"`` -- continuous responses. * ``"binomial"`` -- binary responses (or counts with ``covariate_class_sizes`` at fit time). * ``"poisson"`` -- counts. * ``"gamma"`` -- positive continuous responses (in progress). * ``"exponential"`` -- gamma with dispersion fixed at 1. * ``"inverse_gaussian"`` -- positive continuous responses with cubic mean--variance relationship (in progress). * ``"quasi_binomial"`` -- binomial score; dispersion estimated from the data via Pearson :math:`\\chi^2 / (n - p)` so :meth:`dispersion` reflects over- or under-dispersion relative to the exact-binomial baseline of 1. * ``"quasi_poisson"`` -- Poisson score; same Pearson dispersion estimator, useful when count data has ``variance > mean``. * ``"quantile"`` -- pinball loss; requires ``tau``. * ``"huber"`` -- robust M-estimator; requires ``delta``. Multinomial (ordinal / nominal) and proportional-hazards families are not yet supported. Required unless loading an existing model from file (see ``load_from_file``). Point estimates for ``"quasi_binomial"`` / ``"quasi_poisson"`` coincide with their full-likelihood cousins; only the dispersion estimator (and any inferential quantity that depends on it) differs. link : str, optional Link function. Supported links and their canonical families: ====================== ==================== Link Canonical for family ====================== ==================== ``identity`` ``normal`` ``logistic`` ``binomial`` ``log`` ``poisson`` ``reciprocal`` ``gamma`` ``reciprocal_squared`` ``inverse_gaussian`` ====================== ==================== ``probit`` and ``complementary_log_log`` are also accepted (non-canonical for ``binomial``). If not specified, the canonical link is used. Per the convexity-only design rule (CLAUDE.md), only ``(family, link)`` pairs with a dedicated convex proximal-operator implementation are accepted. dispersion : float, optional Dispersion parameter. Some families (``binomial``, ``poisson``) have a fixed dispersion independent of the data. For other families the dispersion is typically estimated from the data; passing a known value here reduces uncertainty in the model. estimate_overdispersion : bool, optional Whether to estimate over-dispersion for binomial and poisson families. Only meaningful when covariate classes are present and have at least modest size. See [GLM]_, S4.5. Defaults to ``False``. name : str, optional Name for the model. Used in plots and in deriving filenames when ``save_flag=True`` is passed to :meth:`fit`. load_from_file : str, optional Pickle path produced by a previous ``save_flag=True`` fit. When set, every other parameter is ignored. tau : float, optional Quantile level in :math:`(0, 1)` for ``family="quantile"`` (pinball loss). ``tau=0.5`` recovers the conditional median. Required when ``family="quantile"`` and ignored otherwise. delta : float, optional Knee parameter for ``family="huber"``. Residuals with :math:`|y - \\mu| \\le \\delta` are penalized as :math:`0.5 r^2` (least squares); larger residuals are penalized linearly, capping their per-observation influence. Must be positive and is in the units of ``y``. Required when ``family="huber"`` and ignored otherwise. References ---------- .. [glmnet] glmnet (R package). The standard package for GAMs in R. https://cran.r-project.org/web/packages/glmnet/index.html .. [pygam] pygam (Python package). Implements GAMs in Python without using ADMM. https://github.com/dswah/pyGAM .. [GLM] McCullagh, P. and Nelder, J. A. *Generalized Linear Models*. The standard text on GLMs. .. [GAM] Hastie, T. and Tibshirani, R. *Generalized Additive Models*. The book by the folks who invented GAMs. .. [ESL] Hastie, T., Tibshirani, R., and Friedman, J. *The Elements of Statistical Learning*. Covers a lot more than just GAMs. .. [GAMr] Wood, S. N. *Generalized Additive Models: An Introduction with R*. Covers more implementation details than [GAM]_. .. [ADMM] Boyd, S., Parikh, N., Chu, E., Peleato, B., and Eckstein, J. *Distributed Optimization and Statistical Learning via the Alternating Direction Method of Multipliers*. .. [GAMADMM] Chu, E., Keshavarz, A., and Boyd, S. *A Distributed Algorithm for Fitting Generalized Additive Models*. Forms the basis of this package's approach. """ def __init__( self, family: Family | None = None, link: Link | None = None, dispersion: float | None = None, estimate_overdispersion: bool = False, name: str | None = None, load_from_file: str | None = None, tau: float | None = None, delta: float | None = None, ) -> None: if load_from_file is not None: self._load(load_from_file) return if family is None: raise ValueError("Family not specified.") elif family not in FAMILIES: raise ValueError(f"{family} family not supported") elif family == "exponential": # Exponential is a special case of Gamma with a dispersion of 1. self._family = "gamma" dispersion = 1.0 else: self._family = family # quasi_* families share their score function (and therefore the # prox, deviance, and variance function) with a base family; only # the dispersion estimator differs. Per-observation kernels # dispatch on ``_base_family`` so they don't need to learn the # quasi names. self._base_family = QUASI_BASE_FAMILY.get(self._family, self._family) if link is None: self._link = CANONICAL_LINKS[self._family] elif link in LINKS: self._link = link else: raise ValueError(f"{link} link not supported") if self._family == "quantile": if tau is None: raise ValueError("tau must be specified for the quantile family.") if not (0.0 < tau < 1.0): raise ValueError(f"tau must be in (0, 1); got {tau}.") if self._link != "identity": raise ValueError( "quantile family requires link='identity'; " f"got link={self._link!r}." ) self._tau: float | None = float(tau) else: self._tau = None if self._family == "huber": if delta is None: raise ValueError("delta must be specified for the huber family.") if not (delta > 0.0): raise ValueError(f"delta must be positive; got {delta}.") if self._link != "identity": raise ValueError( f"huber family requires link='identity'; got link={self._link!r}." ) self._delta: float | None = float(delta) else: self._delta = None if (self._family, self._link) not in SUPPORTED_FAMILY_LINK_PAIRS: canonical = CANONICAL_LINKS[self._family] raise ValueError( f"({self._family!r}, {self._link!r}) is not a supported " "(family, link) combination. Per the convexity-only design " "(see CLAUDE.md), only pairs with a dedicated convex " f"proximal-operator implementation are accepted; use " f"link={canonical!r} for family={self._family!r}." ) if dispersion is not None: self._known_dispersion = True self._dispersion = dispersion elif ( self._family in FAMILIES_WITH_KNOWN_DISPERSIONS and not estimate_overdispersion ): self._known_dispersion = True self._dispersion = FAMILIES_WITH_KNOWN_DISPERSIONS[self._family] else: self._known_dispersion = False if self._link == "identity": self._eval_link = lambda x: x self._eval_inv_link = lambda x: x elif self._link == "logistic": self._eval_link = lambda x: special.logit(x) self._eval_inv_link = lambda x: special.expit(x) elif self._link == "probit": # Inverse CDF of the Gaussian distribution self._eval_link = lambda x: stats.norm.ppf(x) self._eval_inv_link = lambda x: stats.norm.cdf(x) elif self._link == "complementary_log_log": self._eval_link = lambda x: np.log(-np.log(1.0 - x)) self._eval_inv_link = lambda x: 1.0 - np.exp(-np.exp(x)) elif self._link == "log": self._eval_link = lambda x: np.log(x) self._eval_inv_link = lambda x: np.exp(x) elif self._link == "reciprocal": self._eval_link = lambda x: 1.0 / x self._eval_inv_link = lambda x: 1.0 / x elif self._link == "reciprocal_squared": self._eval_link = lambda x: 1.0 / (x * x) self._eval_inv_link = lambda x: 1.0 / np.sqrt(x) self._estimate_overdispersion = estimate_overdispersion self._features: dict[str, _Feature] = {} self._offset = 0.0 self._num_features = 0 self._fitted = False self._name = name def _save(self) -> None: """Save model state to a pickle file.""" mv: dict[str, Any] = {} mv["family"] = self._family mv["link"] = self._link mv["known_dispersion"] = self._known_dispersion if self._known_dispersion: mv["dispersion"] = self._dispersion mv["tau"] = self._tau mv["delta"] = self._delta mv["estimate_overdispersion"] = self._estimate_overdispersion mv["offset"] = self._offset mv["num_features"] = self._num_features mv["fitted"] = self._fitted mv["name"] = self._name features: dict[str, dict[str, Any]] = {} for name, feature in self._features.items(): features[name] = { "type": feature.__type__, "filename": feature._filename, } mv["features"] = features mv["num_obs"] = self._num_obs mv["y"] = self._y mv["weights"] = self._weights mv["has_covariate_classes"] = self._has_covariate_classes if self._has_covariate_classes: mv["covariate_class_sizes"] = self._covariate_class_sizes mv["f_bar"] = self.f_bar mv["z_bar"] = self.z_bar mv["u"] = self.u mv["prim_res"] = self.prim_res mv["dual_res"] = self.dual_res mv["prim_tol"] = self.prim_tol mv["dual_tol"] = self.dual_tol mv["dev"] = self.dev filename = f"{self._name}_model.pckl" with open(filename, "wb") as f: pickle.dump(mv, f) def _load(self, filename: str) -> None: """Load a saved model from a pickle file.""" with open(filename, "rb") as f: mv = pickle.load(f) self._filename = filename self._family = mv["family"] self._base_family = QUASI_BASE_FAMILY.get(self._family, self._family) self._link = mv["link"] if (self._family, self._link) not in SUPPORTED_FAMILY_LINK_PAIRS: canonical = CANONICAL_LINKS.get(self._family, "<unknown>") raise ValueError( f"Saved model uses ({self._family!r}, {self._link!r}), which " "is no longer a supported (family, link) combination. Per " "the convexity-only design (see CLAUDE.md), only pairs " "with a dedicated convex proximal-operator implementation " "are accepted; refit with link=" f"{canonical!r} for family={self._family!r}." ) self._known_dispersion = mv["known_dispersion"] if self._known_dispersion: self._dispersion = mv["dispersion"] self._tau = mv.get("tau") self._delta = mv.get("delta") self._estimate_overdispersion = mv["estimate_overdispersion"] self._offset = mv["offset"] self._num_features = mv["num_features"] self._fitted = mv["fitted"] self._name = mv["name"] self._features = {} features = mv["features"] for name, feature in features.items(): if feature["type"] == "categorical": self._features[name] = _CategoricalFeature( load_from_file=feature["filename"] ) elif feature["type"] == "linear": self._features[name] = _LinearFeature( load_from_file=feature["filename"] ) elif feature["type"] == "spline": self._features[name] = _SplineFeature( load_from_file=feature["filename"] ) else: raise ValueError("Invalid feature type") self._num_obs = mv["num_obs"] self._y = mv["y"] self._weights = mv["weights"] self._has_covariate_classes = mv["has_covariate_classes"] if self._has_covariate_classes: self._covariate_class_sizes = mv["covariate_class_sizes"] self.f_bar = mv["f_bar"] self.z_bar = mv["z_bar"] self.u = mv["u"] self.prim_res = mv["prim_res"] self.dual_res = mv["dual_res"] self.prim_tol = mv["prim_tol"] self.dual_tol = mv["dual_tol"] self.dev = mv["dev"] if self._link == "identity": self._eval_link = lambda x: x self._eval_inv_link = lambda x: x elif self._link == "logistic": self._eval_link = lambda x: special.logit(x) self._eval_inv_link = lambda x: special.expit(x) elif self._link == "probit": # Inverse CDF of the Gaussian distribution self._eval_link = lambda x: stats.norm.ppf(x) self._eval_inv_link = lambda x: stats.norm.cdf(x) elif self._link == "complementary_log_log": self._eval_link = lambda x: np.log(-np.log(1.0 - x)) self._eval_inv_link = lambda x: 1.0 - np.exp(-np.exp(x)) elif self._link == "log": self._eval_link = lambda x: np.log(x) self._eval_inv_link = lambda x: np.exp(x) elif self._link == "reciprocal": self._eval_link = lambda x: 1.0 / x self._eval_inv_link = lambda x: 1.0 / x elif self._link == "reciprocal_squared": self._eval_link = lambda x: 1.0 / (x * x) self._eval_inv_link = lambda x: 1.0 / np.sqrt(x)
[docs] def add_feature( self, name: str, type: FeatureType, transform: Callable[[npt.NDArray[Any]], npt.NDArray[Any]] | None = None, rel_dof: float | None = None, regularization: dict[str, Any] | None = None, constraints: dict[str, Any] | None = None, ) -> None: """Add a feature to the model. An implicit constant feature is always included, representing the overall average response. Parameters ---------- name : str Name for the feature. Used internally to keep track of features and also used when saving files and in plots. type : str Type of feature. One of: * ``"categorical"`` -- for categorical variables. * ``"linear"`` -- for variables with a linear contribution to the response. * ``"spline"`` -- for variables with a potentially nonlinear contribution to the response. transform : callable, optional Optional transform applied to the feature data. Any callable may be used; it is applied to data provided during fitting and prediction. Common choices include :func:`numpy.log`, :func:`numpy.log1p`, or :func:`numpy.sqrt`. The user may wish to start with a base feature ``"age"`` and use derived features ``"age_linear"`` and ``"age_quadratic"`` to permit quadratic models for that feature, with potentially different regularization applied to each. rel_dof : float, optional Relative degrees of freedom. Applicable only to spline features. The degrees of freedom associated with a spline represent how "wiggly" it is allowed to be: a spline with two degrees of freedom is just a line. (Since features are constrained to have zero mean response over the data, linear features only have one degree of freedom.) The relative degrees of freedom set the baseline smoothing parameter :math:`\\lambda` associated with the feature. When :meth:`fit` is called, the user may specify an overall ``smoothing`` parameter applied to all features to alter the amount of regularization in the entire model; the actual degrees of freedom will vary based on the amount of smoothing. By default, splines have 4 relative degrees of freedom. Regularization of any feature effectively reduces the degrees of freedom; that adjustment is not yet wired in. regularization : dict, optional Dictionary specifying the regularization applied to this feature. Different types of features support different types of regularization. Splines always include a :math:`C^2` smoothness penalty controlled via ``rel_dof``; ``regularization={"group_lasso": {"coef": lam}}`` additionally shrinks the entire spline contribution and can zero it out. ``group_lasso_inf`` is the :math:`L_\\infty`-norm variant (:math:`\\lambda \\|f_j\\|_\\infty`); it produces a clipping rather than a uniform contraction and is also available on linear and categorical features. ``huber`` is a bounded-influence ridge analogue: ``regularization={"huber": {"coef": lam, "delta": d}}`` adds :math:`\\lambda h_\\delta(\\mathrm{coef})` per parameter, with :math:`h_\\delta` quadratic for small magnitudes and linear beyond :math:`\\delta`. Available on linear and categorical features. Other features have more diverse options; see each feature class's docstring. constraints : dict, optional Optional convex shape constraints on the feature's coefficients. ``sign`` (``"nonnegative"`` / ``"nonpositive"``), ``lower``, and ``upper`` (floats) bound coefficients. ``monotonic`` (``"increasing"`` / ``"decreasing"``), ``convex``, and ``concave`` impose ordering / second-difference constraints (categorical features additionally require an ``order`` list of category labels; splines order along the knots). Linear features support only ``sign`` / ``lower`` / ``upper``. See each feature class's docstring for details. """ f: _Feature if type == "categorical": f = _CategoricalFeature( name, regularization=regularization, constraints=constraints ) elif type == "linear": f = _LinearFeature( name, transform, regularization=regularization, constraints=constraints, ) elif type == "spline": f = _SplineFeature( name, transform, rel_dof if rel_dof is not None else 4.0, regularization=regularization, constraints=constraints, ) else: raise ValueError(f"Features of type {type} not supported.") self._features[name] = f self._num_features += 1
[docs] def fit( self, X: pd.DataFrame, y: npt.NDArray[Any], covariate_class_sizes: npt.NDArray[Any] | None = None, weights: npt.NDArray[Any] | None = None, optimizer: str = "admm", smoothing: float = 1.0, save_flag: bool = False, verbose: bool = False, plot_convergence: bool = False, max_its: int = 100, n_jobs: int = 1, ) -> None: """Fit the model to data. Parameters ---------- X : pandas.DataFrame Dataframe of features. The column names must correspond to the names of features added to the model. ``X`` may have extra columns corresponding to features not included in the model; these are silently ignored. Where applicable, the data should be "pre-transformation": any transformations specified in :meth:`add_feature` are applied here. y : array-like Response. Depending on the model family, the response may need to be in a particular form (for example, for a binomial family, ``y`` should contain 0s and 1s); this is not checked. covariate_class_sizes : array-like, optional If observations are grouped into covariate classes, the size of those classes should be listed here. weights : array-like, optional Per-observation weights. Effectively specifies the dispersion of each observation. optimizer : str Optimization method. Currently only ``"admm"`` is supported, so this argument has no effect. smoothing : float Multiplicative scale applied to every feature's regularization coefficient. Lets the user set the overall smoothing by cross-validation while keeping the relative regularization across features fixed. Defaults to ``1.0``, which leaves the regularization as specified in :meth:`add_feature`. save_flag : bool Whether to save intermediate results after each iteration. Useful for complicated models with massive data sets that take a while to fit; if the system crashes during the fit the analyst can resume from the last checkpoint. Defaults to ``False``. verbose : bool Print mildly useful information during the fit. Defaults to ``False``. plot_convergence : bool Plot the convergence graph at the end of the fit. Defaults to ``False``. max_its : int Maximum number of ADMM iterations. Defaults to 100. n_jobs : int Number of threads to use for the per-feature primal step within each ADMM iteration. Defaults to 1 (serial); pass ``-1`` to use :func:`os.cpu_count`. NumPy / SciPy / cvxpy release the GIL during their numeric kernels, so threading produces real speedup. Expect a 2-4x ceiling on models with several non-trivial features (splines, categoricals via cvxpy); pure linear-only models are usually faster serial because the per-feature work is too cheap to amortize thread-dispatch overhead. Notes ----- Many binomial data sets include multiple observations with identical features. For example, a data set with features ``gender`` and ``country`` and a binary survival response might be presented in the compact form ============ ============= ========== =========== gender country patients survivors ============ ============= ========== =========== M USA 50 48 F USA 70 65 M CAN 40 38 F CAN 45 43 ============ ============= ========== =========== This is still a binomial family, just more compact. The compact format is not yet supported; in this context it is important to check for over-dispersion (see [GLM]_). The current implementation assumes no over-dispersion and that the number of observations sharing a feature pattern is small. """ if n_jobs == -1: n_jobs = os.cpu_count() or 1 if n_jobs < 1: raise ValueError(f"n_jobs must be >= 1 or -1; got {n_jobs}") # No point in more workers than features. n_jobs = min(n_jobs, max(self._num_features, 1)) if save_flag and self._name is None: msg = "Cannot save a GAM with no name." msg += " Specify name when instantiating model." raise ValueError(msg) if len(X) != len(y): raise ValueError("Inconsistent number of observations in X and y.") if weights is not None and len(weights) != len(y): raise ValueError( f"weights has length {len(weights)} but y has length {len(y)}." ) if covariate_class_sizes is not None and len(covariate_class_sizes) != len(y): raise ValueError( f"covariate_class_sizes has length {len(covariate_class_sizes)} " f"but y has length {len(y)}." ) self._rho = 0.1 eps_abs = 1e-3 eps_rel = 1e-3 # X may have extra columns not registered as features; ignore them. The # real number of features is self._num_features. self._num_obs, _num_features_in_X = X.shape self._y = np.asarray(y).flatten() self._weights = weights if covariate_class_sizes is not None: self._has_covariate_classes = True self._covariate_class_sizes = covariate_class_sizes mean_response = float(np.sum(self._y)) / float( np.sum(self._covariate_class_sizes) ) self._offset = float(self._eval_link(mean_response)) else: self._has_covariate_classes = False self._covariate_class_sizes = None if self._family == "quantile": # The offset anchors the model's average prediction over # training data, since features are mean-zero. For # quantile regression that anchor must be the marginal # tau-quantile, not the mean. assert self._tau is not None self._offset = float(np.quantile(self._y, self._tau)) elif self._family == "huber": # Huber loss is robust; the median is the L1 limit of # its M-estimator and a sensible anchor regardless of # delta. self._offset = float(np.median(self._y)) else: self._offset = float(self._eval_link(np.mean(self._y))) fj: dict[str, FloatArray] = {} for name, feature in self._features.items(): feature.initialize( np.asarray(X[name].values), smoothing=smoothing, covariate_class_sizes=self._covariate_class_sizes, save_flag=save_flag, save_prefix=self._name, ) fj[name] = np.zeros(self._num_obs) self.f_bar = np.full((self._num_obs,), self._offset / self._num_features) self.z_bar = np.zeros(self._num_obs) self.u = np.zeros(self._num_obs) self.prim_res = [] self.dual_res = [] self.prim_tol = [] self.dual_tol = [] self.dev = [] self._pool = ThreadPoolExecutor(max_workers=n_jobs) if n_jobs > 1 else None try: self._admm_loop(max_its, eps_abs, eps_rel, fj, verbose) finally: if self._pool is not None: self._pool.shutdown(wait=True) self._pool = None self._fitted = True if save_flag: self._save() if plot_convergence: _plot_convergence( self.prim_res, self.prim_tol, self.dual_res, self.dual_tol, self.dev )
def _admm_loop( self, max_its: int, eps_abs: float, eps_rel: float, fj: dict[str, FloatArray], verbose: bool, ) -> None: """Run the ADMM iterations. Extracted from ``fit()`` so the ThreadPoolExecutor lifecycle (set up before the loop, torn down after) can be expressed as a simple try/finally without re-indenting the loop body.""" z_new = np.zeros(self._num_obs) for i in range(max_its): if verbose: print(f"Iteration {i:d}") print("Optimizing primal variables") fpumz = self._num_features * (self.f_bar + self.u - self.z_bar) fj_new: dict[str, FloatArray] = {} f_new = np.full((self._num_obs,), self._offset) self._optimize_features(fpumz, fj_new, f_new, verbose) f_new /= self._num_features if verbose: print("Optimizing dual variables") z_new = self._optimize(self.u + f_new, self._num_features) self.u += f_new - z_new prim_res = np.sqrt(self._num_features) * linalg.norm(f_new - z_new) dual_res = 0.0 norm_ax = 0.0 norm_bz = 0.0 norm_aty = 0.0 num_params = 0 for name, feature in self._features.items(): dr = ( (fj_new[name] - fj[name]) + (z_new - self.z_bar) - (f_new - self.f_bar) ) dual_res += dr.dot(dr) norm_ax += fj_new[name].dot(fj_new[name]) zik = fj_new[name] + z_new - f_new norm_bz += zik.dot(zik) norm_aty += feature.compute_dual_tol(self.u) num_params += feature.num_params() dual_res = self._rho * np.sqrt(dual_res) norm_ax = np.sqrt(norm_ax) norm_bz = np.sqrt(norm_bz) norm_aty = np.sqrt(norm_aty) self.f_bar = f_new fj = fj_new self.z_bar = z_new if self._has_covariate_classes: sccs = np.sum(self._covariate_class_sizes) prim_tol = np.sqrt( sccs * self._num_features ) * eps_abs + eps_rel * np.max([norm_ax, norm_bz]) else: prim_tol = np.sqrt( self._num_obs * self._num_features ) * eps_abs + eps_rel * np.max([norm_ax, norm_bz]) dual_tol = np.sqrt(num_params) * eps_abs + eps_rel * norm_aty self.prim_res.append(prim_res) self.dual_res.append(dual_res) self.prim_tol.append(prim_tol) self.dual_tol.append(dual_tol) self.dev.append(self.deviance()) if prim_res < prim_tol and dual_res < dual_tol: if verbose: print("Fit converged") break else: if verbose: print("Fit did not converge") def _optimize_features( self, fpumz: FloatArray, fj_new: dict[str, FloatArray], f_new: FloatArray, verbose: bool, ) -> None: """Per-feature primal step. Populates ``fj_new`` and accumulates into ``f_new`` in-place. Uses ``self._pool`` (a ``ThreadPoolExecutor`` set up by ``fit()``) when present. ``self._features`` insertion order is preserved on both the serial and parallel paths so ``f_new`` accumulates terms in the same order -- floating-point addition is not associative, so this matters for serial-vs-parallel bit-parity tests. """ if self._pool is None: for name, feature in self._features.items(): if verbose: print(f"Optimizing {name:s}") fj_new[name] = feature.optimize(fpumz, self._rho) f_new += fj_new[name] else: if verbose: print(f"Optimizing {self._num_features:d} features in parallel") futures = { name: self._pool.submit(feature.optimize, fpumz, self._rho) for name, feature in self._features.items() } for name, fut in futures.items(): fj_new[name] = fut.result() f_new += fj_new[name] def _optimize(self, upf: FloatArray, N: int) -> FloatArray: r"""Optimize :math:`\bar{z}` (the ADMM dual step). Solves the optimization problem .. math:: \min_z\; L(N z) + \frac{\rho}{2}\, \| N z - N u - N \bar{f} \|_2^2 where :math:`z` is the variable, :math:`N` is the number of features, :math:`u` is the scaled dual variable, :math:`\bar{f}` is the average feature response, and :math:`L` is the family-and-link-specific negative log-likelihood. The minimizer is computed via a proximal operator (see [GAMADMM]_): .. math:: \mathrm{prox}_\mu(v) := \arg\min_x\, L(x) + \frac{\mu}{2}\, \| x - v \|_2^2. We compute :math:`(1 / N) \cdot \mathrm{prox}_\mu(N (u + \bar{f}))` with :math:`\mu = \rho`, rather than the paper's :math:`\mu = \rho / N`; convergence is much faster this way. Several ``(family, link)`` pairs admit closed-form proximal operators, making this step very fast. Parameters ---------- upf : ndarray Vector representing :math:`u + \bar{f}`. N : int Number of features. Returns ------- z : ndarray Result of the above optimization. """ # ``__init__`` already restricted (family, link) to the # SUPPORTED_FAMILY_LINK_PAIRS allow-list, so every branch below # has a dedicated convex prox; there is no fallback path. # Different prox operators have slightly different signatures # (the binomial variant takes a covariate-class-sizes argument, # quantile / huber take an extra shape parameter), so the # dispatch is typed as Any to keep mypy quiet here. Quasi # families share the score function with their base family, so # we dispatch on ``_base_family`` here. prox: Any if self._base_family == "normal": prox = po._prox_normal_identity elif self._base_family == "binomial": prox = po._prox_binomial_logit if self._has_covariate_classes: return (1.0 / N) * prox( N * upf, self._rho, self._y, self._covariate_class_sizes, self._weights, self._eval_inv_link, ) elif self._base_family == "poisson": prox = po._prox_poisson_log elif self._base_family == "gamma": prox = po._prox_gamma_reciprocal elif self._base_family == "inverse_gaussian": prox = po._prox_inv_gaussian_reciprocal_squared elif self._base_family == "quantile": return (1.0 / N) * po._prox_quantile_identity( N * upf, self._rho, self._y, self._tau, # type: ignore[arg-type] w=self._weights, ) elif self._base_family == "huber": return (1.0 / N) * po._prox_huber_identity( N * upf, self._rho, self._y, self._delta, # type: ignore[arg-type] w=self._weights, ) else: raise ValueError( f"Family {self._family!s} and Link Function {self._link!s} not (yet) supported." ) return (1.0 / N) * prox( N * upf, self._rho, self._y, w=self._weights, inv_link=self._eval_inv_link )
[docs] def predict(self, X: pd.DataFrame) -> FloatArray: """Apply the fitted model to features. Parameters ---------- X : pandas.DataFrame Data for which to predict the response. The column names must correspond to the names of the features used to fit the model. ``X`` may have extra columns corresponding to features not in the model; these are silently ignored. Where applicable, the data should be "pre-transformation": transformations specified at model definition time are applied here. Returns ------- mu : ndarray Predicted mean response for each row of ``X``. """ if not self._fitted: raise AttributeError("Model not yet fit.") num_points, _ = X.shape eta = np.full((num_points,), self._offset) for name, feature in self._features.items(): eta = eta + feature.predict(np.asarray(X[name].values)) return np.asarray(self._eval_inv_link(eta), dtype=float)
def _design_matrix( self, X: pd.DataFrame | None = None ) -> tuple[FloatArray, list[str]]: """Build a full-rank design matrix used by ``robust_covariance``. Parametrization (matches ``statsmodels`` / ``patsy`` defaults): * column 0: intercept (all ones) * one column per linear feature: the (transformed) raw values ``transform(x)`` -- *not* mean-centered. Centering is a reparametrization of the intercept and does not change the fitted ``mu`` or the inferential conclusions on the linear predictor; using raw columns makes the coefficient align with ``statsmodels`` term-by-term. * for each categorical feature with ``K`` levels: ``K - 1`` indicator columns dropping the lexicographically-first level (treatment contrast), again matching ``statsmodels`` / ``patsy``. Spline features and binomial covariate classes are not yet wired up here and raise ``NotImplementedError``. Parameters ---------- X : pandas DataFrame or None If ``None``, builds the design matrix from the training data already absorbed by the features. If a frame is supplied, its columns are looked up by feature name and the per-feature design block is constructed at those values (with the same dropped-category as training). Returns ------- D : (n, p) ndarray names : list of str Column names: ``"Intercept"`` followed by per-feature labels (e.g. ``"purchases"``, ``"country[T.GBR]"``). """ if not self._fitted: raise AttributeError("Model not yet fit.") if self._has_covariate_classes: raise NotImplementedError( "Inferential design matrix not yet supported for binomial " "covariate classes." ) if X is None: n = self._num_obs else: n = len(X) cols: list[FloatArray] = [np.ones(n, dtype=float)] names: list[str] = ["Intercept"] for fname, feature in self._features.items(): if isinstance(feature, _LinearFeature): if X is None: block = np.asarray(feature._x + feature._xmean, dtype=float) else: raw = np.asarray(X[fname].values) if feature._has_transform: block = np.asarray(feature._transform(raw), dtype=float) else: block = raw.astype(float) cols.append(block) names.append(fname) elif isinstance(feature, _CategoricalFeature): cats = sorted(feature._categories, key=str) if X is None: raw = np.array( [feature._categories[idx] for idx in feature.x], dtype=object ) else: raw = np.asarray(X[fname].values) # Drop the lexicographically-first level (treatment contrast). for level in cats[1:]: cols.append(np.asarray(raw == level, dtype=float)) names.append(f"{fname}[T.{level}]") else: raise NotImplementedError( f"Feature {fname!r} of type {feature.__type__!r} is not yet " "supported by robust_covariance / _design_matrix. Currently " "only linear and categorical features are wired up." ) return np.column_stack(cols), names def _glm_irls_terms( self, y: npt.NDArray[Any], mu: FloatArray, ) -> tuple[FloatArray, FloatArray]: """Per-observation IRLS weight ``w`` and eta-scale score ``s``. For a GLM with linear predictor ``eta``, mean ``mu = g^{-1}(eta)`` and variance function ``V(mu)``, the score and Fisher information with respect to ``eta`` are s_i = (y_i - mu_i) * (dmu/deta)_i / V(mu_i) w_i = (dmu/deta)_i^2 / V(mu_i) Together they give the sandwich pieces ``X' diag(w) X`` (bread) and ``X' diag(s^2) X`` (meat). """ if self._base_family == "normal": v = np.ones_like(mu, dtype=float) elif self._base_family == "binomial": eps = np.finfo(float).eps mu_c = np.clip(mu, eps, 1.0 - eps) v = mu_c * (1.0 - mu_c) elif self._base_family == "poisson": v = mu elif self._base_family == "gamma": v = mu * mu elif self._base_family == "inverse_gaussian": v = mu * mu * mu else: raise NotImplementedError( f"robust_covariance does not yet support the {self._family!r} " "family. Supported: normal, binomial, poisson, gamma, " "inverse_gaussian, quasi_binomial, quasi_poisson." ) if self._link == "identity": dmu_deta = np.ones_like(mu, dtype=float) elif self._link == "log": dmu_deta = mu elif self._link == "logistic": eps = np.finfo(float).eps mu_c = np.clip(mu, eps, 1.0 - eps) dmu_deta = mu_c * (1.0 - mu_c) elif self._link == "probit": # eta = Phi^{-1}(mu); dmu/deta = phi(eta) eta = stats.norm.ppf(np.clip(mu, 1e-12, 1.0 - 1e-12)) dmu_deta = stats.norm.pdf(eta) elif self._link == "complementary_log_log": mu_c = np.clip(mu, 1e-12, 1.0 - 1e-12) eta = np.log(-np.log(1.0 - mu_c)) dmu_deta = np.exp(eta) * (1.0 - mu_c) elif self._link == "reciprocal": dmu_deta = -mu * mu elif self._link == "reciprocal_squared": dmu_deta = -0.5 * mu * mu * mu else: raise NotImplementedError( f"robust_covariance does not yet support the {self._link!r} link." ) weight = dmu_deta * dmu_deta / v score = (y - mu) * dmu_deta / v return np.asarray(weight, dtype=float), np.asarray(score, dtype=float)
[docs] def robust_covariance(self, cov_type: str = "HC0") -> FloatArray: """Sandwich (Huber-White) covariance estimator. Returns the heteroscedasticity-consistent covariance for the parameter vector defined by ``_design_matrix``: V = (X' W X)^{-1} (X' diag(s_i^2) X) (X' W X)^{-1} where ``W_ii = (dmu/deta)_i^2 / V(mu_i)`` is the IRLS weight and ``s_i = (y_i - mu_i) (dmu/deta)_i / V(mu_i)`` is the per-observation score on the ``eta`` scale. ``mu`` is the fitted mean from this GAM. The dispersion drops out, so the estimator is identical for known and estimated dispersion. For canonical-link GLMs without regularization, the result matches ``statsmodels.GLM(...).fit(cov_type="HC0").cov_params()`` on the same parametrization to numerical precision. Penalized fits return the *naive* sandwich at the penalized point estimate; for proper penalized inference a bread that includes the penalty Hessian is needed (future work). Parameters ---------- cov_type : ``"HC0"`` or ``"HC1"`` ``"HC0"`` is the standard White estimator. ``"HC1"`` multiplies by ``n / (n - p)`` for a small-sample correction (matches Stata / statsmodels HC1). Returns ------- V : (p, p) ndarray Covariance for the parameter vector ``(intercept, linear..., categorical contrasts...)``, in the order produced by ``_design_matrix()``. Raises ------ AttributeError If the model has not been fit. NotImplementedError For spline features, covariate classes, observation weights, the quantile or huber families, or unsupported ``cov_type`` values. """ if cov_type not in ("HC0", "HC1"): raise NotImplementedError( f"cov_type={cov_type!r} not supported. Use 'HC0' or 'HC1'." ) if not self._fitted: raise AttributeError("Model not yet fit.") if self._weights is not None: raise NotImplementedError( "robust_covariance does not yet support observation weights." ) if self._family in ("quantile", "huber"): raise NotImplementedError( f"robust_covariance does not yet support family={self._family!r}; " "M-estimator sandwich SEs are tracked separately." ) D, _ = self._design_matrix() mu = self._eval_inv_link(self._num_features * self.f_bar) weight, score = self._glm_irls_terms(self._y, mu) bread = D.T @ (weight[:, None] * D) meat = D.T @ ((score * score)[:, None] * D) bread_inv = linalg.inv(bread) cov = bread_inv @ meat @ bread_inv if cov_type == "HC1": n = self._num_obs p = D.shape[1] if n > p: cov = cov * (n / (n - p)) return np.asarray(cov, dtype=float)
[docs] def confidence_intervals( self, X: pd.DataFrame, prediction: bool = False, width: float = 0.95 ) -> FloatArray: """Confidence intervals on predictions. .. note:: Not yet implemented; raises :class:`NotImplementedError`. Two notions of confidence interval are useful here. The first is a confidence interval on :math:`\\mu`, the mean response; this captures the uncertainty in the fitted model. The second is a confidence interval on observations of this model. For a Gaussian family the model might be a perfect fit with billions of observations -- so :math:`\\mu` is known precisely and the mean-response interval is tiny -- but observations are still spread around the mean, so the prediction interval is wider. For a binomial family the estimated mean lies in :math:`(0, 1)` and admits a confidence interval, but the observed response is always 0 or 1 so a "prediction interval" is only meaningful in a pedantic sense. When making multiple predictions a "global" set of intervals (all predictions inside their intervals with the specified joint probability) is sometimes desirable. This function does not compute global intervals; each interval is computed *in vacuo*. Parameters ---------- X : pandas.DataFrame Data for which to predict the response. The column names must correspond to the names of the features used to fit the model. ``X`` may have extra columns corresponding to features not in the model; these are silently ignored. Where applicable, the data should be "pre-transformation". prediction : bool If ``True``, return a confidence interval on the predicted response; if ``False``, on the mean response. Defaults to ``False``. width : float Desired confidence width in :math:`(0, 1)`. Defaults to ``0.95``. Returns ------- bounds : ndarray of shape ``(n, 2)`` Lower and upper bounds on the confidence interval associated with each prediction. """ raise NotImplementedError("confidence_intervals is not yet implemented")
[docs] def plot( self, name: str, true_fn: Callable[[FloatArray], FloatArray] | None = None, ) -> None: """Plot the model component for a particular feature. Parameters ---------- name : str Name of the feature; must be one of the features in the model. true_fn : callable, optional Function representing the "true" relationship between the feature and the response, overlaid on the plot for comparison. """ self._features[name]._plot(true_fn=true_fn)
[docs] def residuals( self, kind: Literal["response", "pearson", "deviance", "anscombe"] = "deviance", ) -> FloatArray: """Residuals for the fitted model. Parameters ---------- kind : ``"response"``, ``"pearson"``, ``"deviance"``, or ``"anscombe"`` ``"response"`` returns ``y - mu`` (raw error). ``"pearson"`` returns the standardized residual ``(y - E[y]) / sqrt(Var(y))`` using the family's variance function and the estimated dispersion. For binomial with covariate classes, ``y`` is the count and ``E[y] = m * mu``. ``"deviance"`` returns ``sign(y - E[y]) * sqrt(d_i)``, where ``d_i >= 0`` is the per-observation deviance contribution; squaring and summing gives the total deviance (Wood 2017 Sec 3.1.7). ``"anscombe"`` returns the family-specific transformation ``(A(y) - A(mu)) / (V(mu)^(1/6) * sqrt(phi))`` with ``A(t) = integral V(s)^(-1/3) ds``, chosen so the residual is approximately standard-normal under correct specification (McCullagh & Nelder 1989 Sec 2.4.1; Wood 2017 Sec 3.1.7). Returns ------- residuals : array of shape (n_obs,) """ if not self._fitted: raise AttributeError("Model not yet fit.") y = self._y mu = self._eval_inv_link(self._num_features * self.f_bar) if self._has_covariate_classes: m: npt.NDArray[Any] | float = self._covariate_class_sizes else: m = 1.0 if kind == "response": return np.asarray(y - mu, dtype=float) if kind == "pearson": return self._pearson_residuals(y, mu, m) if kind == "deviance": d_i = self._unit_deviance_for_residuals(y, mu, m) if self._base_family == "binomial": return np.asarray(np.sign(y - m * mu) * np.sqrt(d_i), dtype=float) return np.asarray(np.sign(y - mu) * np.sqrt(d_i), dtype=float) if kind == "anscombe": return self._anscombe_residuals(y, mu, m) raise ValueError(f"Unknown residual kind: {kind!r}")
def _pearson_residuals( self, y: npt.NDArray[Any], mu: npt.NDArray[Any], m: npt.NDArray[Any] | float, ) -> FloatArray: """Pearson residuals for each family.""" phi = self.dispersion() if self._base_family == "binomial": eps = np.finfo(float).eps mu_c = np.clip(mu, eps, 1.0 - eps) var = m * mu_c * (1.0 - mu_c) return np.asarray((y - m * mu_c) / np.sqrt(var * phi), dtype=float) if self._base_family == "normal": return np.asarray((y - mu) / np.sqrt(phi), dtype=float) if self._base_family == "poisson": return np.asarray((y - mu) / np.sqrt(mu * phi), dtype=float) if self._base_family == "gamma": return np.asarray((y - mu) / np.sqrt(mu * mu * phi), dtype=float) if self._base_family == "inverse_gaussian": return np.asarray((y - mu) / np.sqrt(mu * mu * mu * phi), dtype=float) raise ValueError(f"Unsupported family {self._family!r}") def _unit_deviance_for_residuals( self, y: npt.NDArray[Any], mu: npt.NDArray[Any], m: npt.NDArray[Any] | float, ) -> FloatArray: """Per-observation deviance contribution ``d_i >= 0``. For binomial this differs from the per-term contribution accumulated by ``deviance()``: the ``deviance()`` form omits the saturated-likelihood constant (which doesn't affect model fitting), but ``d_i`` for residuals must include it so that ``sqrt(d_i)`` is real and non-negative. """ if self._base_family == "normal": return np.asarray((y - mu) ** 2, dtype=float) if self._base_family == "binomial": eps = np.finfo(float).eps mu_c = np.clip(mu, eps, 1.0 - eps) with np.errstate(divide="ignore", invalid="ignore"): t1 = np.where(y > 0, y * np.log(y / (m * mu_c)), 0.0) t2 = np.where( y < m, (m - y) * np.log((m - y) / (m * (1.0 - mu_c))), 0.0, ) return np.maximum(2.0 * (t1 + t2), 0.0) if self._base_family == "poisson": with np.errstate(divide="ignore", invalid="ignore"): t = np.where(y > 0, y * np.log(np.where(y > 0, y, 1.0) / mu), 0.0) return np.maximum(2.0 * (t - (y - mu)), 0.0) if self._base_family == "gamma": tiny = np.finfo(float).tiny y_safe = np.where(y > 0, y, tiny) mu_safe = np.where(mu > 0, mu, tiny) return np.maximum( 2.0 * (-np.log(y_safe / mu_safe) + (y - mu_safe) / mu_safe), 0.0 ) if self._base_family == "inverse_gaussian": return np.asarray((y - mu) ** 2 / (mu * mu * y), dtype=float) raise ValueError(f"Unsupported family {self._family!r}") def _anscombe_residuals( self, y: npt.NDArray[Any], mu: npt.NDArray[Any], m: npt.NDArray[Any] | float, ) -> FloatArray: r"""Anscombe residuals using the family-specific variance-stabilizing transformation ``A(t) = \int V(s)^{-1/3} ds``. The residual is r_A = (A(y) - A(mu)) / (V(mu)^(1/6) * sqrt(phi)) with the binomial form scaled by ``sqrt(m)`` because the per-trial proportion ``y/m`` has variance ``V(mu)/m``. See McCullagh & Nelder (1989) Sec 2.4.1 / Table 2.5. """ phi = self.dispersion() if self._base_family == "normal": # V(mu) = 1, A(t) = t -- coincides with the Pearson residual. return np.asarray((y - mu) / np.sqrt(phi), dtype=float) if self._base_family == "binomial": eps = np.finfo(float).eps mu_c = np.clip(mu, eps, 1.0 - eps) prop = np.clip(y / m, eps, 1.0 - eps) # A(p) = B(2/3, 2/3) * I(2/3, 2/3, p) where I is the regularized # incomplete beta function. scale = float(special.beta(2.0 / 3.0, 2.0 / 3.0)) a_y = scale * special.betainc(2.0 / 3.0, 2.0 / 3.0, prop) a_mu = scale * special.betainc(2.0 / 3.0, 2.0 / 3.0, mu_c) denom = (mu_c * (1.0 - mu_c)) ** (1.0 / 6.0) * np.sqrt(phi) return np.asarray(np.sqrt(m) * (a_y - a_mu) / denom, dtype=float) if self._base_family == "poisson": # V(mu) = mu, A(t) = (3/2) t^(2/3). return np.asarray( 1.5 * (y ** (2.0 / 3.0) - mu ** (2.0 / 3.0)) / mu ** (1.0 / 6.0) / np.sqrt(phi), dtype=float, ) if self._base_family == "gamma": # V(mu) = mu^2, A(t) = 3 t^(1/3); V(mu)^(1/6) = mu^(1/3). tiny = np.finfo(float).tiny y_safe = np.where(y > 0, y, tiny) mu_safe = np.where(mu > 0, mu, tiny) return np.asarray( 3.0 * (y_safe ** (1.0 / 3.0) - mu_safe ** (1.0 / 3.0)) / mu_safe ** (1.0 / 3.0) / np.sqrt(phi), dtype=float, ) if self._base_family == "inverse_gaussian": # V(mu) = mu^3, A(t) = log(t); V(mu)^(1/6) = sqrt(mu). return np.asarray((np.log(y) - np.log(mu)) / np.sqrt(mu * phi), dtype=float) raise ValueError(f"Unsupported family {self._family!r}")
[docs] def plot_residuals( self, kind: Literal["response", "pearson", "deviance", "anscombe"] = "deviance", ) -> Any: """Plot residuals vs. fitted values and a normal QQ plot. Produces a 1x2 matplotlib figure: residual-vs-fitted on the left, Q-Q plot against the standard normal on the right. Parameters ---------- kind : ``"response"``, ``"pearson"``, or ``"deviance"`` Forwarded to ``residuals()``. Returns ------- fig : matplotlib.figure.Figure """ import matplotlib.pyplot as plt res = self.residuals(kind) mu = self._eval_inv_link(self._num_features * self.f_bar) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 4)) ax1.scatter(mu, res, s=10, alpha=0.6) ax1.axhline(0.0, color="grey", linewidth=0.5) ax1.set_xlabel("Fitted (mu)") ax1.set_ylabel(f"{kind.capitalize()} residual") ax1.set_title("Residuals vs Fitted") stats.probplot(res, plot=ax2) ax2.set_title("Normal Q-Q") fig.tight_layout() return fig
[docs] def plot_residuals_vs_predictor( self, predictor: npt.ArrayLike, *, kind: Literal["response", "pearson", "deviance", "anscombe"] = "deviance", name: str | None = None, ) -> Any: """Plot residuals against a predictor. Useful for diagnosing fit adequacy: a clear pattern (curvature, funnel shape, etc.) when ``predictor`` is a feature already in the model suggests an unmodeled non-linearity or heteroscedasticity. The same plot for a predictor *not* in the model is a quick check on whether to add it. Parameters ---------- predictor : array-like of shape ``(n_obs,)`` Predictor values for each training observation. Must match the length of the training data used to fit this model. Categorical (string / object) predictors are plotted as a categorical x-axis. kind : ``"response"``, ``"pearson"``, ``"deviance"``, or ``"anscombe"`` Forwarded to ``residuals()``. name : str, optional Label for the x-axis and title. Returns ------- fig : matplotlib.figure.Figure """ import matplotlib.pyplot as plt res = self.residuals(kind) pred = np.asarray(predictor) if pred.shape != (self._num_obs,): raise ValueError( f"predictor has shape {pred.shape}, " f"expected ({self._num_obs},) to match training data" ) label = name if name is not None else "predictor" fig, ax = plt.subplots(figsize=(6, 4)) ax.scatter(pred, res, s=10, alpha=0.6) ax.axhline(0.0, color="grey", linewidth=0.5) ax.set_xlabel(label) ax.set_ylabel(f"{kind.capitalize()} residual") ax.set_title(f"Residuals vs {label}") fig.tight_layout() return fig
[docs] def deviance( self, X: pd.DataFrame | None = None, y: npt.NDArray[Any] | None = None, covariate_class_sizes: npt.NDArray[Any] | None = None, w: npt.NDArray[Any] | None = None, ) -> float: r"""Deviance of the fitted model. With no arguments, returns the training deviance .. math:: D = 2 \phi \bigl(\ell(y; y) - \ell(\mu; y)\bigr), where :math:`\phi` is the dispersion (present only to cancel the denominator of the log-likelihood), :math:`\ell(y; y)` is the log-likelihood of a model that fits the data perfectly, and :math:`\ell(\mu; y)` is the log-likelihood of the fitted model on its training data. This is the quantity ADMM minimizes during fitting. With ``X`` and ``y`` supplied, returns the deviance on the provided data set. Combined with cross-validation this is the usual way to pick the ``smoothing`` parameter by minimizing deviance on a hold-out set. Parameters ---------- X : pandas.DataFrame, optional Dataframe of features. The column names must correspond to the names of features added to the model (see :meth:`predict`). Only used in the hold-out branch. y : array-like, optional Response on the hold-out set. Only used in the hold-out branch. covariate_class_sizes : array-like, optional Array of covariate class sizes for the hold-out data. w : array-like, optional Per-observation weights for the hold-out branch. Returns ------- D : float The deviance of the model. """ if X is None or y is None: y = self._y mu = self._eval_inv_link(self._num_features * self.f_bar) w = self._weights if self._has_covariate_classes: m: npt.NDArray[Any] | float = self._covariate_class_sizes else: m = 1.0 else: mu = self.predict(X) if covariate_class_sizes is not None: m = covariate_class_sizes else: m = 1.0 return self._deviance_from_mu(y, mu, m=m, w=w)
def _deviance_from_mu( self, y: npt.NDArray[Any], mu: npt.NDArray[Any], m: npt.NDArray[Any] | float = 1.0, w: npt.NDArray[Any] | None = None, ) -> float: """Compute deviance for given response, mean, optional class sizes and weights.""" if self._base_family == "normal": y_minus_mu = y - mu if w is None: return float(y_minus_mu.dot(y_minus_mu)) return float(w.dot(y_minus_mu * y_minus_mu)) if self._base_family == "binomial": # Clip mu away from 0 and 1 so log(mu) and log1p(-mu) stay finite. eps = np.finfo(float).eps mu_c = np.clip(mu, eps, 1.0 - eps) if w is None: return float( -2.0 * np.sum(y * np.log(mu_c) + (m - y) * np.log1p(-mu_c)) ) return float(-2.0 * w.dot(y * np.log(mu_c) + (m - y) * np.log1p(-mu_c))) if self._base_family == "poisson": # `y * log(y / mu)` has the conventional limit 0 when y = 0. y_log_term = np.where(y > 0, y * np.log(np.where(y > 0, y, 1.0) / mu), 0.0) term = y_log_term - (y - mu) if w is None: return float(2.0 * np.sum(term)) return float(2.0 * w.dot(term)) if self._base_family == "gamma": # Gamma deviance is undefined at y = 0 or mu <= 0 in the strict # sense; clip both to a tiny positive number to keep training # iterations from blowing up when mu transiently goes negative # (gamma + reciprocal link does not constrain mu). tiny = np.finfo(float).tiny y_safe = np.where(y > 0, y, tiny) mu_safe = np.where(mu > 0, mu, tiny) if w is None: return float( 2.0 * np.sum(-1.0 * np.log(y_safe / mu_safe) + (y - mu_safe) / mu_safe) ) return float( 2.0 * w.dot(-1.0 * np.log(y_safe / mu_safe) + (y - mu_safe) / mu_safe) ) if self._base_family == "inverse_gaussian": if w is None: return float(np.sum((y - mu) * (y - mu) / (mu * mu * y))) return float(w.dot((y - mu) * (y - mu) / (mu * mu * y))) if self._base_family == "quantile": # Pinball loss: rho_tau(r) = max(tau*r, (tau-1)*r). The # quantile family has no proper likelihood, so "deviance" # here is twice the empirical pinball loss -- the # M-estimator analogue of -2 * log-likelihood used by the # ADMM convergence trace and goodness-of-fit summaries. tau = float(self._tau) # type: ignore[arg-type] r = y - mu term = np.maximum(tau * r, (tau - 1.0) * r) if w is None: return float(2.0 * np.sum(term)) return float(2.0 * w.dot(term)) if self._base_family == "huber": # Huber loss is an M-estimator without a proper likelihood; # report 2 * sum(L_delta(y - mu)) so the inner (quadratic) # region matches normal-family deviance and the convergence # trace stays comparable across runs. delta = float(self._delta) # type: ignore[arg-type] r = y - mu abs_r = np.abs(r) term = np.where(abs_r <= delta, 0.5 * r * r, delta * (abs_r - 0.5 * delta)) if w is None: return float(2.0 * np.sum(term)) return float(2.0 * w.dot(term)) raise ValueError(f"Unsupported family {self._family!r}")
[docs] def dispersion(self, formula: str = "deviance") -> float: """Dispersion of the fitted model. Returns the dispersion associated with the model. Some families (binomial, poisson) have a fixed dispersion; for others the dispersion is estimated from the data. Different estimators may be appropriate for different families. The current implementation is the deviance-based estimator of [GAMr]_, Eqn. 3.10 on p. 110. That section notes the estimator works poorly for overdispersed Poisson data with a small mean response and offers alternatives that have not yet been implemented. This is not currently a concern because overdispersion is not supported -- and without overdispersion the Poisson dispersion is exactly 1. Parameters ---------- formula : str Formula for the dispersion. Options: * ``"deviance"`` (default) * ``"pearson"`` * ``"fletcher"`` """ if self._family == "normal": if self._known_dispersion: return float(self._dispersion) return float(self.deviance() / (self._num_obs - self.dof())) if self._family == "binomial": if self._known_dispersion: return float(self._dispersion) if self._estimate_overdispersion: return float(self._binomial_overdispersion()) return 1.0 if self._family == "poisson": if self._known_dispersion: return float(self._dispersion) if self._estimate_overdispersion: return float(self._poisson_overdispersion()) return 1.0 if self._family == "quasi_poisson": # Quasi-likelihood: same score as Poisson, dispersion phi is # always estimated from data via Pearson chi-squared / (n - p). # The user can still pin phi by passing ``dispersion=`` to # ``GAM(...)``. if self._known_dispersion: return float(self._dispersion) return float(self._poisson_overdispersion()) if self._family == "quasi_binomial": # Quasi-likelihood: same score as Binomial, dispersion # estimated by Pearson chi-squared / (n - p) (with the # binomial variance function ``mu * (1 - mu)``, scaled by # ``m`` for covariate-class data). if self._known_dispersion: return float(self._dispersion) return float(self._binomial_pearson_dispersion()) if self._family == "gamma": if self._known_dispersion: return float(self._dispersion) return _gamma_dispersion(self.dof(), self.deviance(), self._num_obs) if self._family == "inverse_gaussian": if self._known_dispersion: return float(self._dispersion) return float(self.deviance() / (self._num_obs - self.dof())) if self._family == "quantile": # Pinball loss is an M-estimator, not a likelihood; there # is no dispersion to estimate. return 1.0 if self._family == "huber": # Huber is an M-estimator. There is no dispersion in the # likelihood sense; sandwich SEs would be the right inferential # tool (#34) but are out of scope here. return 1.0 raise ValueError(f"Unsupported family {self._family!r}")
def _binomial_overdispersion(self, formula: str | None = None) -> float: r"""Estimate binomial over-dispersion. Parameters ---------- formula : str, optional Which formula to use, either ``"replication"`` or ``"pearson"``. See *Notes*. Returns ------- sigma2 : float Estimate of over-dispersion. Cached on ``self._dispersion`` so repeated calls are O(1). Notes ----- When using covariate classes, the observed variance may exceed the baseline for the family due to clustering in the population (see [GLM]_). That text gives two methodologies for estimating over-dispersion. Without covariate classes, over-dispersion cannot be estimated. The most reliable assessment of over-dispersion is only possible when there is replication amongst the covariate classes. As an example, suppose we have data on patients from two hospitals as in the following table. Note that there are three rows for males in hospital 1 -- these could be pooled, but keeping them separate lets us estimate over-dispersion more reliably. ======== ========== ========== =========== Gender Hospital Patients Survivors ======== ========== ========== =========== M 1 30 15 M 1 40 19 M 1 35 15 F 1 10 8 M 2 10 3 M 2 18 6 F 2 40 30 ======== ========== ========== =========== Because we are building a model based on gender and hospital alone, we assume the three M/1 entries are drawn from the same binomial distribution; that hypothesis can be tested with, for example, Welch's t-test. A significant departure from the null hypothesis indicates an unobserved source of between-replicate variation -- different doctors, different time periods, etc. Quantifying the additional variance helps produce more accurate confidence intervals. When replication is present, [GLM]_ suggests the following. Suppose a covariate class (e.g. ``Gender=M, Hospital=1``) has :math:`r` replicates. Across all :math:`r` replicates, determine the observed success rate :math:`\pi`. In the example we have 105 patients and 49 survivors, for :math:`\pi = 0.47`. We then compute the variance on :math:`r - 1` DOF, .. math:: s^2 = \frac{1}{r - 1} \sum_{j=1}^r \frac{(y_j - m_j \pi)^2}{m_j \pi (1 - \pi)}, where :math:`y_j` is the number of successes in the :math:`j`-th replicate and :math:`m_j` the number of trials. Per [GLM]_ this is an unbiased estimate of the dispersion parameter; on these numbers it gives :math:`s^2 = 0.17`, indicating under-dispersion. (These are made up; over- dispersion is more common than under-dispersion in practice.) Each covariate class with replication yields its own dispersion estimate. Assuming the dispersion is independent of the covariate classes (which may or may not be true) we can pool by replication degree. With the :math:`k`-th class having :math:`r_k` replicates and dispersion estimate :math:`s_k^2`, the overall estimate is .. math:: s^2 = \frac{\sum_k (r_k - 1)\, s_k^2}{\sum_k (r_k - 1)}. This pooling formula is *not* in [GLM]_; that text just says to pool the estimates without specifying how. This approach makes sense, but that doesn't make it correct. When replication is sparse the methodology above breaks down. [GLM]_ then advocates the Pearson-residual approach: with :math:`\pi_j` the model prediction for the :math:`j`-th covariate class, .. math:: s^2 = \frac{1}{n - p} \sum_j \frac{(y_j - m_j \pi_j)^2}{m_j \pi_j (1 - \pi_j)}. This uses the model prediction instead of the pooled observation, and :math:`n - p` for the error DOF instead of the number of replicates. It still breaks down when covariate class sizes :math:`m_j` are small. To use the replicate-based formula at least one covariate class must show replication and the degree of replication must be at least two. If those conditions fail and the user requested the replicate-based formula, the directive is silently ignored and the Pearson-based approach used. If ``formula`` is not specified, the criterion for using replication is: at least two covariate classes show replication; the most-replicated class has degree at least 3; and the total replication DOF is at least 10. In the example above replication classes are M/1 and M/2 with replication 3 and 2, total DOF :math:`(2-1) + (3-1) = 3` -- below the threshold of 10 -- so the Pearson formula is used. These criteria are arbitrary and would benefit from further research. """ if not self._has_covariate_classes: return 1.0 min_cc_replicates = 1 min_replication = 2 des_cc_replicates = 2 des_replication = 3 des_replication_dof = 10 # Determine degree of replication # # To use the replication formula, we need at least one # covariate class with replication, and that covariate class # needs replication of at least 2. It might make sense to use # a more stringent set of criteria, but this is enough for # now. # # The way we decide whether two observations have the same # covariate class is by encoding the covariate class by an # index. Each categorical feature has already indexed each # category by an internal integer between 0 and n_k - 1, where # n_k is the number of categories of the kth feature. (None of # this is applicable unless all the features are categorical. # # We use these internal indices along with the numbers of # categories in conjunction with the numpy ravel_multi_index # function to map a tuple of category indices into a single # integer between 0 and the the product of all category sizes # (minus 1). # # We need to take care to loop over the features in a # consistent order, so we create the fnames array just to give # an arbitrary but consistent ordering. r: dict[int, int] = {} covariate_class = np.zeros((self._num_obs,), dtype=np.int64) fnames = sorted(self._features) for i in range(self._num_obs): multi_index = [] dims = [] for fname in fnames: # Overdispersion is only meaningful when every feature is # categorical, so a runtime AttributeError on a non- # categorical feature is the correct behavior. feat = self._features[fname] cindex, csize = feat.category_index(i) # type: ignore[attr-defined] multi_index.append(cindex) dims.append(csize) cci = int(np.ravel_multi_index(multi_index, dims)) covariate_class[i] = cci r[cci] = r.get(cci, 0) + 1 num_cc_with_replicates = 0 max_replication = 0 replication_dof = 0 for j in r.values(): if j > 1: num_cc_with_replicates += 1 replication_dof += j - 1 if j > max_replication: max_replication = j if ( num_cc_with_replicates >= min_cc_replicates and max_replication >= min_replication ): has_replication = True else: has_replication = False if ( num_cc_with_replicates >= des_cc_replicates and max_replication >= des_replication and replication_dof >= des_replication_dof ): has_desired_replication = True else: has_desired_replication = False if formula is None: if has_desired_replication: formula = "replication" else: formula = "pearson" if has_replication and formula == "replication": trials: dict[int, float] = {} successes: dict[int, float] = {} for i in range(self._num_obs): cci = int(covariate_class[i]) trials[cci] = trials.get(cci, 0.0) + float( self._covariate_class_sizes[i] ) successes[cci] = successes.get(cci, 0.0) + float(self._y[i]) s2 = 0.0 for i in range(self._num_obs): cci = int(covariate_class[i]) pi = successes[cci] / trials[cci] num = self._y[i] - self._covariate_class_sizes[i] * pi denom = self._covariate_class_sizes[i] * pi * (1 - pi) s2 += float(num * num / denom) s2 /= replication_dof self._known_dispersion = True self._dispersion = s2 return s2 else: mu = self._eval_inv_link(self._num_features * self.f_bar) m_arr = self._covariate_class_sizes bl_var = np.multiply(mu, 1.0 - mu) res = self._y - np.multiply(m_arr, mu) num_p = np.multiply(res, res) denom_p = np.multiply(m_arr, bl_var) n_minus_p = self._num_obs - self.dof() s2 = float(np.sum(np.divide(num_p, denom_p)) / n_minus_p) self._known_dispersion = True self._dispersion = s2 return s2 def _poisson_overdispersion(self) -> float: r"""Estimate the Poisson dispersion as the Pearson chi-square / dof. Under the standard Poisson model ``Var(Y) = mu``; if observed variance is larger (overdispersion) the dispersion ``phi`` is estimated as phi = sum_i (y_i - mu_i)^2 / mu_i / (n - p) where ``p = dof()``. This is the Pearson form of Eqn 3.10 in Wood (2017). The replication-based estimator used for ``_binomial_overdispersion`` is binomial-specific (it relies on covariate-class replicates) and is not applicable here. The result is cached on ``self._dispersion`` and ``self._known_dispersion`` so subsequent calls are O(1). """ mu = self._eval_inv_link(self._num_features * self.f_bar) # Guard against transient mu <= 0 from a non-canonical link. tiny = np.finfo(float).tiny mu_safe = np.where(mu > 0, mu, tiny) res = self._y - mu n_minus_p = self._num_obs - self.dof() s2 = float(np.sum(res * res / mu_safe) / n_minus_p) self._known_dispersion = True self._dispersion = s2 return s2 def _binomial_pearson_dispersion(self) -> float: r"""Estimate the binomial dispersion as the Pearson chi-square / dof. Used by ``family='quasi_binomial'`` and matches the Pearson branch of ``_binomial_overdispersion`` but works for both Bernoulli (no covariate classes) and binomial-counts (with covariate classes) data: phi = sum_i (y_i - m_i mu_i)^2 / (m_i mu_i (1 - mu_i)) / (n - p) For Bernoulli data ``m_i = 1``. For an exact-binomial fit ``phi`` should be near 1; values markedly larger than 1 indicate overdispersion. The estimator is undefined when ``mu_i in {0, 1}``; we clip ``mu_i`` away from those endpoints so transient near-saturation during the ADMM loop does not blow up the denominator. """ mu = self._eval_inv_link(self._num_features * self.f_bar) eps = np.finfo(float).eps mu_c = np.clip(mu, eps, 1.0 - eps) if self._has_covariate_classes: m_arr = self._covariate_class_sizes res = self._y - m_arr * mu_c denom = m_arr * mu_c * (1.0 - mu_c) else: res = self._y - mu_c denom = mu_c * (1.0 - mu_c) n_minus_p = self._num_obs - self.dof() s2 = float(np.sum(res * res / denom) / n_minus_p) self._known_dispersion = True self._dispersion = s2 return s2
[docs] def dof(self) -> float: """Degrees of freedom: sum of feature DOFs plus the affine intercept.""" dof = 1.0 # Affine factor for _, feature in self._features.items(): dof += feature.dof() return dof
[docs] def aic(self) -> float: """Akaike Information Criterion. Useful for choosing smoothing parameters. The AIC computed here is off by a constant factor, which simplifies the computation without affecting model-selection rankings. Different authors throw in multiplicative or additive factors willy-nilly since they do not affect model selection. """ p = self.dof() if not self._known_dispersion: # If we are estimating the dispersion, we need to # add one to the DOF. p += 1 # Note that the deviance is twice the dispersion times the # log-likelihood, so no factor of two required there. return self.deviance() / self.dispersion() + 2.0 * p
# return (self.deviance() / self._num_obs # + 2. * p * self.dispersion() / self._num_obs)
[docs] def aicc(self) -> float: """AIC corrected for finite sample size. Applies the Hurvich-Tsai small-sample correction to ``aic()``: AICc = AIC + 2 * p * (p + 1) / (n - p - 1) where ``p`` is the effective number of parameters (``dof()``, plus one if the dispersion is being estimated) and ``n`` is the number of observations. This is Eqn 6.32 on p. 304 of [GAMr]. As ``n`` grows the correction term vanishes and AICc reduces to AIC. AICc is preferable to AIC when ``n`` is small relative to ``p``. Returns ``+inf`` for an overparameterized fit (``n <= p + 1``), where the correction term is undefined and the model has no useful AICc. """ p = self.dof() if not self._known_dispersion: # Match the convention in aic(): one extra parameter for # the estimated dispersion. p += 1 n = self._num_obs if n - p - 1 <= 0: return float("inf") return self.aic() + 2.0 * p * (p + 1) / (n - p - 1)
[docs] def bic(self) -> float: """Bayesian Information Criterion. Computed as BIC = deviance / dispersion + log(n) * p where ``p`` is the effective parameter count (``dof()``, plus one when dispersion is estimated, matching ``aic()`` and ``aicc()``) and ``n`` is the number of observations. As with ``aic()``, the BIC is off by the same constant factor; only differences are meaningful for model selection. """ p = self.dof() if not self._known_dispersion: p += 1 return self.deviance() / self.dispersion() + float(np.log(self._num_obs)) * p
[docs] def null_deviance(self) -> float: """Deviance of the intercept-only model on the training data. The null model predicts the same mean response for every observation: ``mean(y)`` (or ``sum(y) / sum(covariate_class_sizes)`` when binomial covariate classes were supplied at fit time). """ y = self._y if self._has_covariate_classes: m: npt.NDArray[Any] | float = self._covariate_class_sizes mean_response = float(np.sum(y)) / float(np.sum(m)) else: m = 1.0 mean_response = float(np.mean(y)) mu = np.full_like(y, mean_response, dtype=float) return self._deviance_from_mu(y, mu, m=m, w=self._weights)
[docs] def r_squared(self) -> float: """Deviance-based pseudo-R^2. Returns ``1 - deviance() / null_deviance()``. For a normal family with identity link this is the conventional R^2; for other families it is the deviance pseudo-R^2 of McCullagh and Nelder. Returns ``nan`` when the null deviance is zero (the response is constant), where pseudo-R^2 is undefined. """ null_dev = self.null_deviance() if null_dev == 0.0: return float("nan") return 1.0 - self.deviance() / null_dev
[docs] def ubre(self, gamma: float = 1.0) -> float: """Un-Biased Risk Estimator. Returns the Un-Biased Risk Estimator as discussed in Sections 6.2.1 and 6.2.5 of [GAMr]_. Useful for choosing the smoothing parameter when the dispersion is known. As discussed in Section 6.2.5 of [GAMr]_, sometimes it is helpful to force smoother fits by exaggerating the effective degrees of freedom; a value of ``gamma > 1`` may be desirable in that case. """ return self.deviance() + 2.0 * gamma * self.dispersion() * self.dof()
[docs] def gcv(self, gamma: float = 1.0) -> float: """Generalized Cross Validation score. Useful for choosing the smoothing parameter when the dispersion is unknown. As discussed in Section 6.2.5 of [GAMr]_, sometimes it is helpful to force smoother fits by exaggerating the effective degrees of freedom; a value of ``gamma > 1`` may be desirable in that case. """ denom = self._num_obs - gamma * self.dof() return self._num_obs * self.deviance() / (denom * denom)
[docs] def summary(self) -> None: """Print summary statistics for the fitted model. Prints statistics for the overall model and for each individual feature (see the ``__str__`` method on each feature class for the per-feature output). The overall model section reports: ============ ===================================================== ``phi`` Estimated dispersion parameter. Omitted when dispersion was supplied at construction time or is fixed by the family (e.g. Poisson). ``edof`` Estimated degrees of freedom. ``Deviance`` Twice the dispersion times the difference between the log-likelihood of the saturated model and that of the fitted model. ``AIC`` Akaike Information Criterion. ``AICc`` AIC corrected for finite data sets. ``BIC`` Bayesian Information Criterion. ``R^2`` Deviance-based pseudo-:math:`R^2`. ``UBRE`` Unbiased Risk Estimator (when dispersion is known). ``GCV`` Generalized Cross Validation (when dispersion is estimated). ============ ===================================================== See the corresponding methods (:meth:`aic`, :meth:`aicc`, :meth:`bic`, :meth:`r_squared`, :meth:`ubre`, :meth:`gcv`) for definitions. """ print("Model Statistics") print("----------------") if not self._known_dispersion: print(f"phi: {self.dispersion():0.06g}") print(f"edof: {self.dof():0.0f}") print(f"Deviance: {self.deviance():0.06g}") print(f"AIC: {self.aic():0.06g}") print(f"AICc: {self.aicc():0.06g}") print(f"BIC: {self.bic():0.06g}") print(f"R^2: {self.r_squared():0.06g}") if self._known_dispersion: print(f"UBRE: {self.ubre():0.06g}") else: print(f"GCV: {self.gcv():0.06g}") print() print("Features") print("--------") for _name, feature in self._features.items(): print(str(feature))
[docs] def fit_adaptive_lasso( gam: GAM, X: pd.DataFrame, y: npt.NDArray[Any], *, gamma: float = 1.0, eps: float = 1e-6, **fit_kwargs: Any, ) -> GAM: """Fit ``gam`` via the two-stage adaptive lasso (Zou, 2006). The first stage fits ``gam`` to ``(X, y)`` using whatever L1 regularization the user configured on each feature -- producing a pilot estimate of every coefficient. The second stage rewrites each L1-regularized feature's per-coefficient L1 weight to ``base / (|pilot| + eps) ** gamma`` and refits. Coefficients with large pilot magnitudes are penalized less; tiny pilots (likely noise) are penalized more, recovering the "oracle" sparsity pattern with less bias on the truly active coefficients than plain L1. Both stages are convex (they are ordinary weighted-L1 fits at fixed weights), so this is just a thin wrapper over two ``GAM.fit`` calls. Parameters ---------- gam : GAM Fully configured but not-yet-fit GAM. At least one feature must carry an ``regularization={"l1": {"coef": ...}}`` configuration; other features and other regularizers (l2, group_lasso, ...) ride along untouched. The same ``gam`` object is mutated in place and returned. X, y : data Forwarded to both ``gam.fit`` calls. gamma : float Adaptive-lasso exponent (typically 1.0). Larger ``gamma`` makes the second-stage weights swing harder around the pilot magnitudes. eps : float Stabilizer added to the pilot magnitude before exponentiation, so a pilot of exactly zero produces a finite (though large) second-stage weight rather than a divide-by-zero. **fit_kwargs Forwarded to both ``gam.fit`` calls. Both stages see the same ``smoothing``, ``max_its``, ``weights``, etc. Returns ------- gam : GAM The same model, now fit with adaptive L1 weights. """ if gamma <= 0.0: raise ValueError(f"gamma must be positive; got {gamma}.") if eps <= 0.0: raise ValueError(f"eps must be positive; got {eps}.") has_l1 = any(getattr(f, "_has_l1", False) for f in gam._features.values()) if not has_l1: raise ValueError( "fit_adaptive_lasso requires at least one feature with " 'regularization={"l1": ...}; without an L1 term to reweight, ' "the second stage is identical to the first." ) gam.fit(X, y, **fit_kwargs) rewrote_any = False for feature in gam._features.values(): apply = getattr(feature, "_apply_adaptive_l1", None) if apply is None: continue if apply(gamma, eps): rewrote_any = True if not rewrote_any: # Defensive: should be impossible given the has_l1 guard above, # but feature subclasses without `_apply_adaptive_l1` would land # here. Skip the second fit rather than producing identical output. return gam gam.fit(X, y, **fit_kwargs) return gam