# pyre-unsafe
"""Core bootstrap sampling functions."""
import multiprocessing as mp
import warnings
from typing import Any
import numpy as np
import numpy.typing as npt
import pandas as pd
from pathos.multiprocessing import ProcessPool as Pool
from bootstrap_stat._utils import (
ArrayLike,
RNGLike,
Statistic,
_apply_rng,
_spawn_rngs,
)
from bootstrap_stat.distributions import EmpiricalDistribution
[docs]
def jackknife_values(
x: ArrayLike | tuple[ArrayLike, ...],
stat: Statistic,
sample: int | None = None,
num_threads: int = 1,
) -> npt.NDArray[np.float64]:
"""Compute jackknife values.
Parameters
----------
x : array_like or pandas DataFrame or tuple of arrays/DataFrames.
The data.
stat : function
The statistic.
sample : int, optional
When Jackknifing a multi-sample distribution, like for an A/B
test, we generate one set of jackknife values for each
sample. The caller should specify which sample for which
jackknife values should be generated, calling this function
once for each sample.
num_threads : int, optional
Number of threads to use for multicore processing. Defaults to
1, meaning all calculations will be done in a single
thread. Set to -1 to use all available cores.
Returns
-------
jv : ndarray
The jackknife values.
See Also
--------
bootstrap_samples : Generate bootstrap samples from an empirical
distribution.
Notes
-----
The jackknife values consist of the statistic applied to a
collection of datasets derived from the original by holding out
each observation in turn. For example, let x1 be the dataset
corresponding to x, but with the first datapoint removed. The
first jackknife value is simply stat(x1).
"""
if num_threads == -1:
num_threads = mp.cpu_count()
if sample is not None and isinstance(x, tuple):
# Multi-sample jackknife. Create a new statistic that is
# simply a wrapper around the desired `stat`. Only perform the
# hold-out logic on the specified sample.
x = list(x)
x_b = x[0:sample]
x_s = x[sample]
x_a = x[(sample + 1) :]
def statistic(zz):
xx = x_b + [zz] + x_a
return stat((*xx,))
return jackknife_values(x_s, statistic, num_threads=num_threads)
if isinstance(x, pd.DataFrame) or isinstance(x, pd.Series):
is_dataframe = True
n = len(x.index)
else:
is_dataframe = False
x = np.array(x)
n = len(x)
def _jackknife_sim(x, stat, is_dataframe, start, end):
n = end - start
theta_i = np.empty((n,))
for i in range(start, end):
if is_dataframe:
xi = x.drop(x.index[i])
else:
xi = np.delete(x, i)
theta_i[i - start] = stat(xi)
return theta_i
if num_threads == 1:
theta_i = _jackknife_sim(x, stat, is_dataframe, 0, n)
else:
pool = Pool(num_threads)
try:
pool.restart()
except AssertionError:
pass
results = []
batch_size = n // num_threads
extra = n % num_threads
batch_sizes = [batch_size] * num_threads
for i in range(extra):
batch_sizes[i] += 1
start = 0
for i in range(num_threads):
end = start + batch_sizes[i]
r = pool.apipe(_jackknife_sim, x, stat, is_dataframe, start, end)
results.append(r)
start = end
theta_i = np.hstack([res.get() for res in results])
pool.close()
pool.join()
return theta_i
[docs]
def multithreaded_bootstrap_samples(
dist: EmpiricalDistribution,
stat: Statistic | dict[str, Statistic],
B: int,
size: int | tuple[int, ...] | None = None,
jackknife: bool = False,
num_threads: int = -1,
rng: RNGLike = None,
) -> npt.NDArray[np.float64] | dict[str, npt.NDArray[np.float64]]:
"""Generate bootstrap samples in parallel.
Parameters
----------
dist : EmpiricalDistribution
The empirical distribution.
stat : function or dict_like
The statistic or statistics for which we wish to calculate the
standard error. If you want to compute different statistics on
the same bootstrap samples, specify as a dictionary having
functions as values.
B : int
Number of bootstrap samples.
size : int or tuple of ints, optional
Size to pass for generating samples. Defaults to None.
num_threads : int, optional
Number of threads to use. Defaults to the number of available
CPUs.
rng : Generator, SeedSequence, int, or None, optional
Source of randomness. If provided, ``num_threads`` independent
child streams are spawned via ``SeedSequence.spawn`` so each
worker draws from a statistically-independent stream. If not
provided, ``dist`` 's existing rng is used as the parent.
Returns
-------
theta_star : ndarray or dictionary
Array of bootstrapped statistic values. When multiple
statistics are being calculated on the same bootstrap samples,
the return value will be a dictionary having keys the same as
`stat` and values an ndarray for each statistic.
See Also
--------
bootstrap_samples : Single-threaded bootstrap sampling with
optional jackknife support.
Notes
-----
Jackknifing is not currently supported.
"""
if num_threads == -1:
num_threads = mp.cpu_count()
def _bootstrap_sim(dist, stat, size, B, worker_rng):
if isinstance(stat, dict):
theta_star = {k: np.empty((B,)) for k in stat}
else:
theta_star = np.empty((B,))
# Each worker is a separate process; mutating `dist._rng` here
# only affects this worker's copy.
_apply_rng(dist, worker_rng)
for i in range(B):
x_star = dist.sample(size=size)
if isinstance(stat, dict):
for k, v in stat.items():
theta_star[k][i] = v(x_star)
else:
theta_star[i] = stat(x_star)
return theta_star
pool = Pool(num_threads)
try:
# If we have used a pool before, we need to restart it.
pool.restart()
except AssertionError:
# If have never used a pool before, no need to do anything.
pass
results = []
batch_size = B // num_threads
extra = B % num_threads
batch_sizes = [batch_size] * num_threads
for i in range(extra):
batch_sizes[i] += 1
parent_rng = np.random.default_rng(rng) if rng is not None else dist._rng
worker_rngs = _spawn_rngs(parent_rng, num_threads)
for i, worker_rng in enumerate(worker_rngs):
r = pool.apipe(_bootstrap_sim, dist, stat, size, batch_sizes[i], worker_rng)
results.append(r)
if isinstance(stat, dict):
theta_star = {k: [] for k in stat}
for res in results:
t = res.get()
for k in stat:
theta_star[k].extend(t[k])
theta_star = {k: np.array(v) for k, v in theta_star.items()}
else:
theta_star = np.hstack([res.get() for res in results])
pool.close()
pool.join()
return theta_star
[docs]
def bootstrap_samples(
dist: EmpiricalDistribution,
stat: Statistic | dict[str, Statistic],
B: int,
size: int | tuple[int, ...] | None = None,
jackknife: bool = False,
num_threads: int = 1,
rng: RNGLike = None,
) -> (
npt.NDArray[np.float64]
| dict[str, npt.NDArray[np.float64]]
| tuple[npt.NDArray[np.float64] | dict[str, npt.NDArray[np.float64]], list[Any]]
):
r"""Generate bootstrap samples.
Parameters
----------
dist : EmpiricalDistribution
The empirical distribution.
stat : function or dict_like
The statistic or statistics for which we wish to calculate the
standard error. If you want to compute different statistics on
the same bootstrap samples, specify as a dictionary having
functions as values.
B : int
Number of bootstrap samples.
size : int or tuple of ints, optional
Size to pass for generating samples. Defaults to None.
jackknife : boolean, optional
If True, returns an array of jackknife bootstrap statistics
(see the `jackknife_array` return value). Defaults to False.
num_threads : int, optional
Number of threads to use for multicore processing. Defaults to
1, meaning all calculations will be done in a single
thread. Set to -1 to use all available cores.
rng : Generator, SeedSequence, int, or None, optional
Source of randomness. If provided, takes precedence over the
rng stored on ``dist``; the dist's rng is replaced with one
derived from this argument so subsequent calls remain
reproducible. If ``None`` (default), uses ``dist._rng`` as-is.
Returns
-------
theta_star : ndarray or dictionary
Array of bootstrapped statistic values. When multiple
statistics are being calculated on the same bootstrap samples,
the return value will be a dictionary having keys the same as
`stat` and values an ndarray for each statistic.
jackknife_array : ndarray
Array of n arrays or dicts, where n is the number of elements
of the original dataset upon which the empirical distribution
is based. Each element of this array is in turn either an
array or dict according to `stat`. If `stat` is just a single
function, it will be an array, otherwise a dict. The ith
element of the inner array will be those bootstrap statistics
corresponding to samples not including the ith value from the
original dataset. (For jackknifing, only samples not including
the ith datapoint can be used for inferences involving the ith
point.) See [ET93]_ (S19.4) for details. Only returned if
`jackknife` is True.
See Also
--------
jackknife_values : Compute jackknife values directly.
multithreaded_bootstrap_samples : Parallel-only bootstrap sampling.
Notes
-----
For each of the :math:`B` bootstrap replicates, a sample of size
:math:`n` is drawn with replacement from the empirical distribution
:math:`\hat{F}`, and ``stat`` is applied to produce
:math:`\hat{\theta}^*_b`. This collection forms the bootstrap
distribution, from which standard errors, confidence intervals, and
significance levels can be estimated.
When ``jackknife=True``, the function tracks which original
observations appear in each bootstrap sample. The
jackknife-after-bootstrap approach in [ET93]_ (S19.4) uses this
structure to estimate the influence of each observation on the
bootstrap distribution without additional simulation.
"""
if num_threads == -1:
num_threads = mp.cpu_count()
if jackknife and num_threads > 1:
warnings.warn(
"Multicore support for jackknifing not yet supported. "
"Continuing with 1 core"
)
num_threads = 1
if rng is not None:
_apply_rng(dist, rng)
if num_threads > 1:
return multithreaded_bootstrap_samples(
dist, stat, B, size=size, num_threads=num_threads
)
if isinstance(stat, dict):
theta_star = {k: np.empty((B,)) for k in stat}
else:
theta_star = np.empty((B,))
if jackknife:
jackknife_array = None
for i in range(B):
if jackknife:
x_star, ind = dist.sample(size=size, return_indices=True)
else:
x_star = dist.sample(size=size)
if jackknife and jackknife_array is None:
n = dist.n
if isinstance(stat, dict):
jackknife_array = [{k: []} for _ in range(n) for k in stat]
else:
jackknife_array = [[] for _ in range(n)]
if isinstance(stat, dict):
for k, v in stat.items():
theta_star[k][i] = v(x_star)
else:
theta_star[i] = stat(x_star)
if jackknife:
for j in range(len(jackknife_array)):
if j not in ind:
if isinstance(stat, dict):
for k in stat:
jackknife_array[j][k] = theta_star[k][i]
else:
jackknife_array[j].append(theta_star[i])
if jackknife:
return theta_star, jackknife_array
else:
return theta_star