Source code for turbustat.statistics.lm_seg

# Licensed under an MIT open source license - see LICENSE
from __future__ import print_function, absolute_import, division

'''
Port of the R seg.lm.fit function.
From the pysegmented package: https://github.com/e-koch/pysegmented

The MIT License (MIT)

Copyright (c) 2014 Eric Koch

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''

import statsmodels.api as sm
import numpy as np
import warnings
from copy import copy


[docs] class Lm_Seg(object): """ """ def __init__(self, x, y, brk, weights=None): super(Lm_Seg, self).__init__() self.x = x self.y = y self.brk = brk if not np.isfinite(self.brk): raise ValueError("brk must be a finite value.") # Make sure the starting break point is in range of the data if self.x.max() < self.brk or self.x.min() > self.brk: raise ValueError("brk is outside the range in x.") if weights is not None: if x.size != weights.size: raise ValueError("weights must have the same shape as x and y") if not np.isfinite(weights).all(): raise ValueError("weights should all be finite values.") self.weights = weights # Check for nans, infs... good_vals = np.logical_and(np.isfinite(self.y), np.isfinite(self.x)) self.y = self.y[good_vals] self.x = self.x[good_vals] if self.x.size <= 3 or self.y.size <= 3: raise Warning("Not enough finite points to fit.")
[docs] def fit_model(self, tol=1e-3, iter_max=100, h_step=2.0, epsil_0=10, constant=True, verbose=True, missing='drop', **fit_kwargs): ''' ''' # Fit a normal linear model to the data if constant: x_const = sm.add_constant(self.x) else: x_const = self.x if self.weights is None: model = sm.OLS(self.y, x_const, missing=missing) else: model = sm.WLS(self.y, x_const, weights=self.weights, missing=missing) init_lm = model.fit(**fit_kwargs) if verbose: print(init_lm.summary()) epsil = epsil_0 # Before we get into the loop, make sure that this was a bad fit if epsil_0 < tol: warnings.warning('Initial epsilon is smaller than tolerance. \ The tolerance should be set smaller.') return init_lm # Sum of residuals dev_0 = np.sum(init_lm.resid**2.) # Catch cases where a break isn't necessary self.break_fail_flag = False # Count it = 0 # Now loop through and minimize the residuals by changing where the # breaking point is. while np.abs(epsil) > tol: U = (self.x - self.brk) * (self.x > self.brk) V = deriv_max(self.x, self.brk) X_all = np.vstack([self.x, U, V]).T if constant: X_all = sm.add_constant(X_all) if self.weights is None: model = sm.OLS(self.y, X_all, missing=missing) else: model = sm.WLS(self.y, X_all, weights=self.weights, missing=missing) fit = model.fit() beta = fit.params[2] # Get coef gamma = fit.params[3] # Get coef # Adjust the break point new_brk = copy(self.brk) new_brk += (h_step * gamma) / beta # If the new break point is outside of the allowed range, reset # the step size to half of the original, then try stepping again h_it = 0 if not (self.x > new_brk).any() or (self.x > new_brk).all(): while True: # Remove step taken new_brk -= (h_step * gamma) / beta # Now half the step and try again. h_step /= 2.0 new_brk += (h_step * gamma) / beta h_it += 1 if (self.x > new_brk).any() and not (self.x > new_brk).all(): self.brk = new_brk break if h_it >= 5: self.break_fail_flag = True it = iter_max + 1 warnings.warn("Cannot find good step-size, assuming\ break not needed") break else: self.brk = new_brk dev_1 = np.sum(fit.resid**2.) epsil = (dev_1 - dev_0) / (dev_0 + 1e-3) dev_0 = dev_1 if verbose: print("Iteration: %s/%s" % (it + 1, iter_max)) print(fit.summary()) print("Break Point: " + str(self.brk)) print("Epsilon: " + str(epsil)) it += 1 if it > iter_max: warnings.warn("Max iterations reached. \ Result may not be minimized.") break # Is the initial model without a break better? if self.break_fail_flag or np.sum(init_lm.resid**2) <= np.sum(fit.resid**2): # If the initial fit was better, the segmented fit failed. self.break_fail_flag = True self.brk = self.x.max() X_all = sm.add_constant(self.x) else: # With the break point hopefully found, do a final good fit U = (self.x - self.brk) * (self.x > self.brk) V = deriv_max(self.x, self.brk) X_all = np.vstack([self.x, U, V]).T X_all = sm.add_constant(X_all) if self.weights is None: model = sm.OLS(self.y, X_all, missing=missing) else: model = sm.WLS(self.y, X_all, weights=self.weights, missing=missing) self.fit = model.fit() self._params = self.fit.params self._errs = self.fit.bse if not self.break_fail_flag: self.brk_err = brk_errs(self.params, fit.cov_params()) else: self.brk_err = 0.0 self.get_slopes()
[docs] def model(self, x=None, model_return=False): p = self.params trans_pt = np.abs(x - self.brk).argmin() mod_eqn = lambda k: p[0] + p[1] * k * (k < self.brk) + \ ((p[1] + p[2]) * k + (-p[2]) * k[trans_pt]) * (k >= self.brk) if model_return or x is None: return mod_eqn return mod_eqn(x)
[docs] def get_slopes(self): ''' ''' # Deal with non-break case if self.break_fail_flag: self._slopes = np.asarray([self.params[1]]) self._slope_errs = np.asarray([self.param_errs[1]]) return self n_slopes = self.params[1:-2].shape[0] self._slopes = np.empty(n_slopes) self._slope_errs = np.empty(n_slopes) for s in range(n_slopes): if s == 0: self._slopes[s] = self.params[s + 1] self._slope_errs[s] = self.param_errs[s + 1] else: self._slopes[s] = self.params[s + 1] + self._slopes[:s] self._slope_errs[s] = \ np.sqrt(self.param_errs[s + 1] ** 2 + self._slope_errs[:s]**2)
@property def slopes(self): return self._slopes @property def slope_errs(self): return self._slope_errs @property def params(self): return np.append(self._params, self.brk) @property def param_errs(self): return np.append(self._errs, self.brk_err)
[docs] def plot(self, x, show_data=True): ''' ''' import matplotlib.pyplot as p if show_data: p.plot(self.x, self.y, 'bD') p.plot(x, self.model(x), 'g') p.grid(True) p.show()
def deriv_max(a, b, pow=1): if pow == 1: dum = -1 * np.ones(a.shape) dum[a < b] = 0 return dum else: return -pow * np.max(a - b, axis=0) ** (pow - 1) def brk_errs(params, cov): ''' Given the covariance matrix of the fits, calculate the standard error on the break. ''' # Var gamma term1 = cov[3, 3] # Var beta * (beta/gamma)^2` term2 = cov[2, 2] * (params[3] / params[2])**2. # Correlation b/w gamma and beta term3 = 2 * cov[3, 2] * (params[3] / params[2]) return np.sqrt(term1 + term2 + term3)