Source code for pyest.gm.gm

import os
from .. utils import protected_cached_property

import cvxopt
import jax
import numpy as np
import pandas as pd
from numpy import pi, sqrt
from copy import copy
from numpy.random import rand, randn
from scipy.stats._multivariate import _LOG_2PI, _PSD, _squeeze_output
from scipy.stats import Covariance, _covariance
from scipy.stats import multivariate_normal
from scipy.linalg import cholesky
from pyest.linalg import triangularize, cholesky_from_sqrt_precision

from huggingface_hub import hf_hub_download

__all__ = [
    'GaussianMixture',
    'GaussSplitOptions',
    'Covariance',
    'multivariate_normal',
    'fit_1d_uniform',
    'gm_fit_1d',
    'equal_weights',
    'eval_gmpdf',
    'eval_gmpdfchol',
    'eval_mvnpdf',
    'eval_mvnpdfchol',
    'v_eval_mvnpdf',
    'v_eval_mvnpdfchol',
    'integral_gauss_product',
    'integral_gauss_product_chol',
    'integral_squared_gm',
    'marginal_2d',
    'marginal_nd',
    'comp_bounds',
    'bounds',
    'gm_pdf_2d',
    'sigma_contour',
    'distribute_mean_centers',
    'optimal_homoscedastic_std',
]

sigmavals = None


def load_1d_unifit_sigmavals():
    file_in_repo = "l2optimalunifitsiglib.parquet"
    repo_id = "klegrand/l2optimalunifitsiglib"
    local_file_name = ".data/l2optimalunifitsiglib.parquet"
    # Check if file exists locally
    if not os.path.exists(local_file_name):
        print(f"Local file '{local_file_name}' not found. Downloading from Hugging Face...")
        hf_hub_download(
            repo_id=repo_id,
            repo_type='dataset',
            filename=file_in_repo,
            local_dir=".data",
            local_dir_use_symlinks=False
        )
        print("Download complete!")
    else:
        print(f"Local file '{local_file_name}' already exists.")
    return np.array(pd.read_parquet(local_file_name))


[docs] def distribute_mean_centers(lb, ub, L): """ distribute L mean centers between lb and ub""" dmi = 1.0/(L + 1.0) mi = np.cumsum(dmi*np.ones([L, 1])) return lb + (ub - lb)*mi
def __check_width(width): """asserts that width is positive""" assert width > 0 def __check_num_comp(L): assert isinstance(L, int) assert L >= 1 assert L <= 15000 def find_gm_res(stdmax, width=1): """find necessary number of components needed to meet comp. std max""" global sigmavals # noqa: F824 __check_width(width) if sigmavals is None: sigmavals = load_1d_unifit_sigmavals() L = int(np.where(sigmavals*width <= stdmax)[0][0]) return L
[docs] def optimal_homoscedastic_std(L, width=1): """generates optimal std for an L component GM under homoscedasticity constraint""" global sigmavals # noqa: F824 __check_width(width) __check_num_comp(L) if sigmavals is None: sigmavals = load_1d_unifit_sigmavals() return width*sigmavals[L-1]
[docs] def fit_1d_uniform(lb, ub, L=100): """approximates 1D uniform distribution with GM Parameters ---------- lb: float lower bound of uniform distribution ub: float upper bound of uniform distribution L : int, optional number of components to use in approximation. Default is 100 Returns ------- GaussianMixture """ sx = optimal_homoscedastic_std(L, width=ub-lb) mk = distribute_mean_centers(lb, ub, L).reshape((L, 1)) wk = equal_weights(L) Pk = np.tile(sx*sx, (L, 1, 1)) return GaussianMixture(wk, mk, Pk)
def fit_weights_to_f_1d(xvals, fvals, m, std): """optimize weights of GM components m,std to fit xvals,fvals assumes m values are evenly-spaced assumes homoscedasticity (equal covariances/std) """ L = len(m) d = 1.0/(sqrt(2.0*pi)*std) X, M = np.meshgrid(xvals, m) H = d*np.exp(-0.5*((X.T - M.T)/std)**2) """ for i in range(len(marg)): rhoi = xvals[i] for j in range(Lx): H[i,j] = d*exp(-0.5*((rhoi - mx[j])/sx)**2) """ Hqp = cvxopt.matrix(H.T @ H) fqp = cvxopt.matrix((-H.T @ fvals).reshape((H.shape[1], 1))) Aqp = cvxopt.matrix(-np.eye(L)) bqp = cvxopt.matrix(np.zeros(L).reshape((L, 1))) Aeqqp = cvxopt.matrix(np.ones(L).reshape((1, L))) beqqp = cvxopt.matrix(1.0) cs = cvxopt.solvers cs.options['show_progress'] = False cs.options['glpk'] = dict(msg_lev='GLP_MSG_OFF') qp_sol = cs.qp(Hqp, fqp, Aqp, bqp, Aeqqp, beqqp) w = np.array(qp_sol['x']).squeeze() return w
[docs] def gm_fit_1d(xvals, pvals, support=None, stdmax=None, L=None): global sigmavals # noqa: F824 if support is None: support = (min(xvals), max(xvals)) # TODO: check that only either stdmax or L is defined lb, ub = support width = ub - lb L = find_gm_res(stdmax, width) mx = distribute_mean_centers(lb, ub, L).reshape((L, 1)) sx = optimal_homoscedastic_std(L, width) wx = fit_weights_to_f_1d(xvals, pvals, mx, sx) Sk = np.tile(sx, (1, 1, L)) return GaussianMixture(wx, mx, Sk, 'cholesky')
[docs] def equal_weights(L): """ generate L equal weights that sum to 1 Parameters ---------- L : int number of weights Returns ------- ndarray """ __check_num_comp(L) return np.tile(1.0/L, L)
[docs] def eval_gmpdf(x, w, m, P): """ evaluates Gaussian mixture at points x Parameters ---------- x : array_like (n_samp, vect_length) points at which to evaluate the GM w : array_like (nC,) Gaussian mixture weights m : array_like (nC, vect_length) Gaussian mixture means P : array_like (nC, vect_length, vect_length) Gaussian mixture covariances Returns ------- ndarray (n_samp,) Gaussian mixture evaluated at points x """ x = np.array(x) if x.shape[-1] != m.shape[-1]: raise ValueError('The last dimensions of x and m must match.') q = (w[:, np.newaxis]*np.array(v_eval_mvnpdf(x, m, P))).sum(axis=0) return q.squeeze()
[docs] def eval_gmpdfchol(x, w, m, S): """ evaluates Gaussian mixture at points x Parameters ---------- x : array_like (n_samp, vect_length) points at which to evaluate the GM w : array_like (nC,) Gaussian mixture weights m : array_like (nC, vect_length) Gaussian mixture means S : array_like (nC, vect_length, vect_length) Gaussian mixture lower-triangular Cholesky factors Returns ------- ndarray (n_samp,) Gaussian mixture evaluated at points x """ x = np.array(x) if x.shape[-1] != m.shape[-1]: raise ValueError('The last dimensions of x and m must match.') q = (w[:, np.newaxis]*np.array(v_eval_mvnpdfchol(x, m, S))).sum(axis=0) return q.squeeze()
def optimized_eval_gmpdf(x, w, m, Schol, log_pdet): """ Evaluates Gaussian mixture at points x Parameters ---------- x : array_like (n_samp, vect_length) points at which to evaluate the GM w : array_like (nC,) Gaussian mixture weights m : array_like (nC, vect_length) Gaussian mixture means Schol : array_like (num_comp, vect_length, vect_length) Gaussian mixture covariance lower-triangular Cholesky factors log_pdet : array_like (nC,) log of the determinant of the covariance matrix Returns ------- ndarray (n_samp,) Gaussian mixture evaluated at points x """ x = np.asarray(x) w = np.asarray(w) m = np.asarray(m) if x.shape[1] != m.shape[1]: raise ValueError('The last dimensions of x and m must match.') vect_length = x.shape[1] # Vectorized computation of multivariate normal PDFs diff = x[:, np.newaxis, :] - m[np.newaxis, :, :] # (n_eval, n_comp, vect_length) # y = np.vectorize( # lambda S,x:solve_triangular(S,x.T,lower=True).T, # signature='(n,n),(n)->(n)')(Schol[np.newaxis,:,:], diff) y = jax.scipy.linalg.solve_triangular(np.tile(Schol, (x.shape[0], 1, 1, 1)), diff, lower=True) maha_dist = np.sum(y**2, axis=2) # Compute log-likelihoods log_likelihood = -0.5 * (vect_length * _LOG_2PI + log_pdet + maha_dist) # Compute weighted probabilities q = np.sum(w[np.newaxis, :] * np.exp(log_likelihood), axis=1) return q.squeeze() def eval_logmvnpdf(x, mean, prec_U, log_det_cov, rank): """ Parameters ---------- x : ndarray Points at which to evaluate the log of the probability density function mean : ndarray Mean of the distribution prec_U : ndarray A decomposition such that np.dot(prec_U, prec_U.T) is the precision matrix, i.e. inverse of the covariance matrix. log_det_cov : float Logarithm of the determinant of the covariance matrix rank : int Rank of the covariance matrix. Notes ----- As this function does no argument checking, it should not be called directly; use 'logpdf' instead. """ dev = x - mean maha = np.sum(np.square(np.dot(dev, prec_U)), axis=-1) return -0.5 * (rank * _LOG_2PI + log_det_cov + maha)
[docs] def eval_mvnpdf(x, m, P, allow_singular=False): """ Multivariate normal probability density function. Parameters ---------- x : array_like Quantiles, with the last axis of `x` denoting the components. %(_mvn_doc_default_callparams)s Returns ------- pdf : ndarray or scalar Probability density function evaluated at `x` Notes ----- %(_mvn_doc_callparams_note)s """ psd = _PSD(P, allow_singular=allow_singular) out = np.exp(eval_logmvnpdf(x, m, psd.U, psd.log_pdet, psd.rank)) return _squeeze_output(out)
[docs] def eval_mvnpdfchol(x, m, S, allow_singular=False): """ Multivariate normal probability density function. Parameters ---------- x : array_like Quantiles, with the last axis of `x` denoting the components. m : ndarray (nx,) mean vector S : ndarray (nx, nx) lower-triangular Cholesky factor of covariance matrix Returns ------- pdf : ndarray or scalar Probability density function evaluated at `x` """ nx = m.shape[-1] log_pdet = 2*np.log(np.diag(S)).sum(axis=-1) residuals = x - m[np.newaxis, :] mahalanobis_dist_sqrd = np.array([np.linalg.norm(np.linalg.solve(S, res))**2 for res in residuals]) out = np.exp(-0.5*(nx*_LOG_2PI + log_pdet + mahalanobis_dist_sqrd)) return _squeeze_output(out)
[docs] def v_eval_mvnpdf(x, m, P): return [np.atleast_1d(eval_mvnpdf(x, mi, Pi)) for mi, Pi in zip(m, P)]
[docs] def v_eval_mvnpdfchol(x, m, S): return [np.atleast_1d(eval_mvnpdfchol(x, mi, Si)) for mi, Si in zip(m, S)]
def eig_sqrt_factor(P): """ compute eigenvector based sqrt factor of covariance Parameters ---------- P: ndarray (n,n) covariance matrix Returns ------- ndarray eigenvector based sqrt factor """ eigvals, eigvecs = np.linalg.eigh(np.atleast_2d(P)) return eigvecs@np.diag(eigvals**0.5)
[docs] def integral_gauss_product_chol(m1, S1, m2, S2): ''' compute integral of product of two Gaussians Parameters ---------- m1: np.ndarray mean of Gaussian 1 S1: np.ndarray Cholesky factor of covariance of Gaussian 1 m2: np.ndarray mean of Gaussian 2 S2: np.ndarray Cholesky factor of covariance of Gaussian 2 Returns ------- integral: float integral of the product of the two Gaussians ''' assert(all(np.diag(S1) >= 0)) assert(all(np.diag(S2) >= 0)) # compute the product of the two Gaussians S1pS2 = triangularize(np.hstack((S1, S2))) return eval_mvnpdfchol(m1, m2, S1pS2)
[docs] def integral_gauss_product(m1, P1, m2, P2, allow_singular=False): ''' compute integral of product of two Gaussians Parameters ---------- m1: np.ndarray mean of Gaussian 1 P1: np.ndarray covariance of Gaussian 1 m2: np.ndarray mean of Gaussian 2 P2: np.ndarray covariance of Gaussian 2 Returns ------- integral: float integral of the product of the two Gaussians ''' return multivariate_normal.pdf(m1, m2, P1 + P2, allow_singular=allow_singular)
[docs] def integral_squared_gm(p): ''' compute integral of squared Gaussian mixture Parameters ---------- p : GaussianMixture Gaussian mixture Returns ------- integral : float integral of the squared Gaussian mixture ''' return np.sum([ wi*wj*eval_mvnpdf(mi, mj, Pi + Pj) for (wi, mi, Pi) in p for (wj, mj, Pj) in p ])
[docs] def marginal_2d(m, P, dimensions=[0, 1]): """ compute 2D marginal of GM""" return marginal_nd(m, P, dimensions)
[docs] def marginal_nd(m, P, dimensions): """ returns nd-mariginal GM over specified dimensions """ dimensions = np.array(dimensions) mnd = m[:, dimensions] Pnd = P[:, dimensions[:, np.newaxis], dimensions] return mnd, Pnd
[docs] def comp_bounds(m, P, sigma_mult=3): """ find lower and upper sigma bounds for each component """ diag_ind = np.arange(m.shape[-1]) dev = sigma_mult*np.sqrt(P[:, diag_ind, diag_ind]) lb = m - dev ub = m + dev return lb, ub
[docs] def bounds(m, P, sigma_mult=3): """ find bounds of GM """ lb, ub = comp_bounds(m, P, sigma_mult) return np.min(lb, axis=0), np.max(ub, axis=0)
def print_3d_mat(A): for idx in range(A.shape[2]): print("[:, :, {}] = \n{}\n".format(idx, str(A[:, :, idx])))
[docs] def gm_pdf_2d(w, m, P, dimensions=(0, 1), res=100, xbnd=None, ybnd=None): """ evaluate GM pdf in two dimensions Parameters ---------- w: ndarray (nC,) weights m: ndarray (nC, nx) means P: ndarray (nC, nx, nx) covariances Optional -------- dimensions: tuple dimensions to evaluate pdf over. Defaults to (0, 1) res: int resolution of the grid. Defaults to 100 xbnd: tuple x-axis bounds. Determined automatically if not provided ybnd: tuple y-axis bounds. Determined automatically if not provided Returns ------- ndarray (res, res) pdf values ndarray (res, res) x values ndarray (res, res) y values """ w = np.atleast_1d(w) m = np.atleast_2d(m) if P.ndim == 2: # make P a 3D array P = np.expand_dims(P, axis=0) m2d, P2d = marginal_2d(m, P, dimensions) if not xbnd or not ybnd: lb, rb = bounds(m2d, P2d) if not xbnd: xbnd = (lb[0], rb[0]) if not ybnd: ybnd = (lb[1], rb[1]) xvec = np.linspace(*xbnd, res) yvec = np.linspace(*ybnd, res) X, Y = np.meshgrid(xvec, yvec) p = 0.0 for wi, mi, Pi in zip(w, m2d, P2d): S, U = np.linalg.eigh(Pi) # eigenvalues, eigenvectors V = U.T L = 1.0/S D = 1.0/np.sqrt(np.prod(2*np.pi*S)) # normal distribution denominator N = V @ mi dx = V[0, 0]*X + V[0, 1]*Y - N[0] dy = V[1, 0]*X + V[1, 1]*Y - N[1] pe = np.exp(-0.5*(dx*dx*L[0] + dy*dy*L[1])) p = p + wi*D*pe return p, X, Y
[docs] def sigma_contour(m, P, sig_mul=1, num_pts=100): """ compute sigma contours of a 2D Gaussian Parameters ---------- m: ndarray (2,) mean vector P: ndarray (2, 2) covariance matrix sig_mul: float, optional sigma multiplier factor. For example, sig_mul=2 will return points corresponding to the 2-sigma contour. Defaults to sig_mul=1 num_pts: int, optional number of points to compute. Defaults to num_pts=100 Returns ------- ndarray (num_pts, 2) array of points along the contour """ theta = np.linspace(0, 2*np.pi, num_pts) ze = np.vstack((np.cos(theta), np.sin(theta))) S = cholesky(P, lower=True) # cholesky sqrt factor return m + np.dot(sig_mul*S, ze).T
[docs] class GaussianMixture(object):
[docs] def __init__(self, w, m, cov, cov_type='full', Seig=None): """ Parameters ---------- w: array_like (nC,) array of mixand weights m: array_like (nC,nx) array of mixand means cov: array_like (nC,nx,nx) array of covariances. Covariances can be specified in different ways, as specified by the optional cov_type argument Optional -------- cov_type: str type of covariance matrix. Defaults to 'full'. Seig: 3d array eigenvalues and eigenvectors of the covariance matrix. Defaults to None. """ self.set_w(w) self.set_m(m) self._Seig = Seig # directly write Seig here as to not overwrite P self._set_cov(cov, cov_type) # check that equal numbers of weights, means, and covariances are provided if len(self.w) != len(self.m) or len(self.w) != len(self._cov): raise ValueError("Number of weights, means, and covariances must match.") self.set_msize(self._cov[0].covariance.shape[-1])
def __getitem__(self, ind): return self.get_comp(ind) def _not_allowed(self): raise RuntimeError("Can not set size directly.") def __add__(self, gm2): if isinstance(gm2, int) and gm2 == 0: # Explicitly check for integer 0 return self if not isinstance(gm2, GaussianMixture): raise TypeError("Can only add GaussianMixture objects or 0.") w = np.hstack((self.w, gm2.w)) m = np.vstack((self.m, gm2.m)) # TODO: use cov objects to add covariances P = np.vstack((self.P, gm2.P)) return GaussianMixture(w, m, P) def __radd__(self, gm2): return self.__add__(gm2) def __mul__(self, a): return GaussianMixture(a*self.w, self.m, self._cov) def __rmul__(self, a): return self.__mul__(a) def __call__(self, x): return self.eval(x) def __len__(self): return len(self.w) def __eq__(self, oth): if ~np.all(self.w == oth.w): return False elif ~np.all(self.m == oth.m): return False elif ~np.all(self.P == oth.P): return False return True def _compute_eig_sqrt_factors(self): return np.array([eig_sqrt_factor(cov.covariance) for cov in self._cov])
[docs] def get_size(self): """ Returns the number of components in the Gaussian mixture model. """ return len(self.w)
# def _set_size(self): # self._size = len(self._w)
[docs] def get_w(self): """ Returns the weights (1d array) of the Gaussian mixture model. """ return self._w
[docs] def set_w(self, w): """ Sets the weights (1d array) of the Gaussian mixture model. """ self._w = np.atleast_1d(w)
[docs] def get_m(self): """ Returns the means (2d array (nC,nx)) of the Gaussian mixture model. """ return self._m
[docs] def set_m(self, m): """ Sets the means (2d array (nC,nx)) of the Gaussian mixture model. """ self._m = np.atleast_2d(m)
[docs] def get_P(self, ind=None): """ Returns the covariance matrices (3d array (nC,nx,nx)) of the Gaussian mixture model. """ if ind is not None: if np.isscalar(ind): return self._cov[ind].covariance else: return np.array([P.covariance for P in np.array(self._cov)[ind]]) return np.array([P.covariance for P in self._cov])
[docs] def set_P(self, P): """ Sets the covariance matrices (3d array (nC,nx,nx)) of the Gaussian mixture model. """ self._set_cov(P, 'full')
def _set_cov(self, cov, cov_type): if isinstance(np.atleast_1d(cov)[0], Covariance): self._cov = np.atleast_1d(cov) return if cov_type == 'full' or cov_type == 'cholesky': cov = np.atleast_2d(cov) if cov.ndim == 2: covarr = cov[np.newaxis, :, :] else: covarr = cov if cov_type == 'full': # compute PSD objects psd = [_PSD(P) for P in covarr] self._cov = [_covariance.CovViaPSD(psdi) for psdi in psd] return elif cov_type == 'cholesky': self._cov = [Covariance.from_cholesky(Si) for Si in covarr] return elif cov_type == 'eigendecomposition': self._cov = [Covariance.from_eigendecomposition(ed) for ed in covarr] return else: raise ValueError('cov_type must be one of "full", "cholesky", or "eigendecomposition"')
[docs] def set_Seig(self, S): """ Sets the covariance matrix eigenvalues and eigenvectors. """ S = np.atleast_2d(S) if S.ndim == 2: self._Seig = S[np.newaxis, :, :] else: self._Seig = S # updated covariance values # TODO: rather than compute Si@Si.T, instantiate covariance object # using the eigendecomposition self._set_cov(np.array([Si@Si.T for Si in self._Seig]), 'full')
@protected_cached_property def Schol(self): ''' return the covariance matrix lower cholesky square root factor ''' if isinstance(self._cov[0], _covariance.CovViaCholesky): return np.array([cov._factor for cov in self._cov]) elif isinstance(self._cov[0], _covariance.CovViaPrecision): return np.array([cholesky_from_sqrt_precision(cov._chol_P) for cov in self._cov]) else: return np.array([cholesky(cov.covariance, lower=True) for cov in self._cov]) @protected_cached_property def Seig(self): ''' return the covariance matrix eigenvector square root factor ''' return self._compute_eig_sqrt_factors() @protected_cached_property def _PSD(self): if isinstance(self._cov[0], _covariance.CovViaPSD): return [cov._psd for cov in self._cov] return [_PSD(P) for P in self.P] @protected_cached_property def log_pdet(self): return np.array([cov._log_pdet for cov in self._cov]) @protected_cached_property def prec_sqrt(self): """ compute precision sqrt factor Returns ------- ndarray (nC,nx,nx) array of precision matrix square root factors, i.e. A decomposition such that np.dot(prec_U, prec_U.T) is the precision matrix, i.e. inverse of the covariance matrix.""" if isinstance(self._cov[0], _covariance.CovViaCholesky): return np.array([np.linalg.inv(cov._factor.T) for cov in self._cov]) if isinstance(self._cov[0], _covariance.CovViaPrecision): return np.array([cov._chol_P for cov in self._cov]) if isinstance(self._cov[0], _covariance.CovViaPSD): return np.array([cov._LP for cov in self._cov]) return np.array([psd.U for psd in self._PSD])
[docs] def get_msize(self): """get vector length of mean""" return self._msize
[docs] def set_msize(self, msize): """set vector length of mean""" self._msize = msize
[docs] def get_comp(self, ind): return self.w[ind], self.m[ind], self.get_P(ind)
[docs] def eval(self, x): """ evaluate gm at n_eval points stored in numpy array x Required -------- x: ndarray (n_eval, nx) array of points at which to evaluate the GM Returns ------- ndarray (n_eval,) array of GM evaluations at points x """ x = np.atleast_2d(x) if x.shape[-1] != self.m.shape[-1]: raise ValueError( 'The last dimension of x must match the state dimension.') return _squeeze_output(optimized_eval_gmpdf(x, self.w, self.m, self.Schol, self.log_pdet))
[docs] def mean(self): """ return the mean of the distribution Returns ------- ndarray (nx,) distribution mean """ return np.sum(self.w[:, np.newaxis] * self.m, axis=0)
[docs] def cov(self): """Compute the covariance matrix of the distribution. Returns ------- ndarray (nx, nx) full covariance matrix of the distribution """ # Compute the mean of the distribution mean = self.mean() w = copy(self.w) w = w / np.sum(w) # Extract individual covariances covs = self.P # Compute the difference between each mixand mean and the distribution mean diffs = self.m - mean # shape (nC, nx) # Compute the outer product of diffs for each sample outer_diffs = diffs[:, :, None] * diffs[:, None, :] # shape (nC, nx, nx) # Weighted sum of covariances and outer products return np.sum(w[:, None, None] * (covs + outer_diffs), axis=0)
[docs] def cdf(self, x, allow_singular=False): """ evaluate GM CDF at points x""" x = np.atleast_2d(x) if x.shape[-1] != self.m.shape[-1]: raise ValueError( 'The last dimension of x must match the state dimension.') return np.sum([self.w[i]*multivariate_normal.cdf(x, self.m[i], self._cov[i], allow_singular=allow_singular) for i in range(len(self))], axis=0)
[docs] def comp_bounds(self, sigma_mult=3): return comp_bounds(self.m, self.P, sigma_mult)
[docs] def marginal_2d(self, dimensions=(0, 1)): """ return 2D marginalized GM """ return GaussianMixture(self.w, *marginal_2d(self.m, self.P, dimensions))
[docs] def marginal_nd(self, dimensions): """ return nD marginalization of GM over specified dimensions """ return GaussianMixture(self.w, *marginal_nd(self.m, self.P, dimensions))
[docs] def pdf_2d(self, dimensions=(0, 1), res=100, xbnd=None, ybnd=None): """ evaluate GM pdf in two dimensions Optional -------- dimensions: tuple dimensions to evaluate pdf over. Defaults to (0, 1) res: int resolution of the grid. Defaults to 100 xbnd: tuple x-axis bounds. Determined automatically if not provided ybnd: tuple y-axis bounds. Determined automatically if not provided Returns ------- ndarray (res, res) pdf values ndarray (res, res) x values ndarray (res, res) y values """ return gm_pdf_2d(self.w, self.m, self.P, dimensions, res, xbnd, ybnd)
[docs] def pop(self, idx): """ remove and return component by index Required -------- idx: int index of component to remove Returns ------- tuple (weight, mean, covariance) of removed component """ w, m, P = self[idx] self._w = np.delete(self._w, idx, 0) self._m = np.delete(self._m, idx, 0) self._cov = np.delete(self._cov, idx, 0) return w, m, P
[docs] def rvs(self, size=None, random_state=None): """ Gaussian Mixture generated random variates Parameters ---------- size : int, optional Defining number of random variates (Default is 1) random_state : int, numpy.random.Generator, numpy.random.RandomState, optional Random state for reproducibility. Can be: - An integer seed - A numpy.random.Generator instance - A numpy.random.RandomState instance - None (default) for non-deterministic results Returns ------- ndarray Random variates Notes ----- If size is not specified, a single (possibly-vector) random variate is returned. If size is specified, an ndarray of random variates is returned, even for size=1. """ # Handle random_state parameter if random_state is None: rng = np.random.default_rng() elif isinstance(random_state, (int, np.integer)): rng = np.random.default_rng(random_state) elif isinstance(random_state, np.random.Generator): rng = random_state elif isinstance(random_state, np.random.RandomState): # For backward compatibility with old numpy API rng = random_state else: raise ValueError("random_state must be an int, Generator, RandomState, or None") sort_idx = np.argsort(-self._w) # sort and normalize weight w_sorted = self._w[sort_idx]/np.sum(self._w) cum_weights = np.cumsum(w_sorted) S_sorted = self.Schol[sort_idx] if size is None: # Generate uniform random number for component selection u = rng.random() if isinstance(rng, np.random.Generator) else rng.rand() comp_idx = np.where(u < cum_weights)[0][0] # Generate normal random variates z = rng.standard_normal(self._msize) if isinstance(rng, np.random.Generator) else rng.randn(self._msize) return self.m[sort_idx][comp_idx] + S_sorted[comp_idx] @ z else: # Generate uniform random numbers for component selection u = rng.random(size) if isinstance(rng, np.random.Generator) else rng.rand(size) comp_idx = [np.where(u[i] < cum_weights)[0][0] for i in range(size)] # Generate normal random variates samples = [] for c in comp_idx: z = rng.standard_normal(self._msize) if isinstance(rng, np.random.Generator) else rng.randn(self._msize) samples.append(self.m[sort_idx][c] + S_sorted[c] @ z) return _squeeze_output(np.array(samples))
size = property(get_size, _not_allowed) w = property(get_w, set_w) m = property(get_m, set_m) P = property(get_P, set_P) dim = property(get_msize, set_msize)
[docs] class GaussSplitOptions(object):
[docs] def __init__( self, L=3, lam=1e-3, recurse_depth=np.inf, min_weight=1e-3, state_idxs=(0, 1), spectral_direction_only=False, variance_preserving=True ): """ Options for FoV splitting Parameters ---------- L: int, optional number of components to spit Gaussian into. Default is 3 lam: float, optional Gauss split optimization regularization parameter. Lower values result in less component spread and larger component covariances. This value should be decreased as L increases. Default is 1e-3 recurse_depth: int, optional maximum recursion depth. Default is inf min_weight: float, optional smallest component weight that is considered for splitting. Default is 1e-3 state_idxs: tuple optional (2,) indices of state vector corresponding to the FoV space. Default is (0,1) spectral_direction_only: bool, optional if True, only use spectral direction for splitting. Default is False variance_preserving: bool, optional if True, preserve variance in splitting. Default is True """ self.L = L self.lam = lam self.recurse_depth = recurse_depth self.min_weight = min_weight self.state_idxs = np.array(state_idxs) self.spectral_direction_only = spectral_direction_only self.variance_preserving = variance_preserving