from __future__ import print_function, division
import logging
import sys
import numpy as np
import sklearn.linear_model
from .yatsm import YATSM
from ..accel import try_jit
from ..errors import TSLengthException
from ..masking import smooth_mask, multitemp_mask
from ..regression.diagnostics import rmse
# Setup
logger = logging.getLogger('yatsm_algo')
@try_jit(nopython=True)
def _monitor_calc_scores(X, Y, here, scores, predictions, rmse,
test_indices, min_rmse):
""" Calculate monitoring period scaled residuals
"""
for i in range(scores.shape[1]):
for i_b, b in enumerate(test_indices):
scores[i_b, i] = (
(Y[b, here + i] - predictions[i_b, i]) /
max(min_rmse[b], rmse[i_b])
)
[docs]class CCDCesque(YATSM):
"""Initialize a CCDC-like model for data X (spectra) and Y (dates)
An unofficial and unvalidated port of the Continuous Change Detection and
Classification (CCDC) algorithm by Zhu and Woodcock, 2014.
Args:
test_indices (numpy.ndarray): Test for changes with these
indices of ``Y``. If not provided, all series in ``Y`` will be used
as test indices
estimator (dict): dictionary containing estimation model from
``scikit-learn`` used to fit and predict timeseries and,
optionally, a dict of options for the estimation model ``fit``
method (default: ``{'object': Lasso(alpha=20), 'fit': {}}``)
consecutive (int): Consecutive observations to trigger change
threshold (float): Test statistic threshold for change
min_obs (int): Minimum observations in model
min_rmse (float): Minimum RMSE for models during testing
retrain_time (float): Number of days between model fit updates during
monitoring period
screening (str): Style of prescreening of the timeseries for noise.
Options are 'RLM' or 'LOWESS' (default: RLM)
screening_crit (float): critical value for multitemporal
noise screening (default: 400.0)
remove_noise (bool): Remove observation if change is not
detected but first observation is above threshold (if it looks like
noise) (default: True)
green_band (int): Index of green band in ``Y`` for
multitemporal masking (default: 1)
swir1_band (int): Index of first SWIR band in ``Y`` for
multitemporal masking (default: 4)
dynamic_rmse (bool): Vary RMSE as a function of day of year
(default: False)
slope_test (float or bool): Use an additional slope test to
assess the suitability of the training period. A value of True
enables the test and uses the `threshold` parameter as the test
criterion. False turns off the test or a float value enables the
test but overrides the test criterion threshold. (default: False)
idx_slope (int): if ``slope_test`` is enabled, provide index of ``X``
containing slope term (default: 1)
.. document private functions
.. automethod:: _get_dynamic_rmse
.. automethod:: _get_model_rmse
"""
ndays = 365.25
def __init__(self,
test_indices=None,
estimator={'object': sklearn.linear_model.Lasso(alpha=20),
'fit': {}},
consecutive=5, threshold=2.56, min_obs=None, min_rmse=None,
retrain_time=365.25, screening='RLM', screening_crit=400.0,
remove_noise=True, green_band=1, swir1_band=4,
dynamic_rmse=False, slope_test=False, idx_slope=1,
**kwargs):
# Parent sets up test_indices and lm
super(CCDCesque, self).__init__(test_indices, estimator, **kwargs)
# Store model hyperparameters
self.consecutive = consecutive
self.threshold = threshold
self.min_obs = min_obs or 16
self.min_rmse = min_rmse
self.retrain_time = retrain_time
# Define screening method according to type
if screening == 'RLM':
self.screen_timeseries = self._screen_timeseries_RLM
logger.debug('Using RLM for screening')
elif screening == 'LOWESS':
self.screen_timeseries = self._screen_timeseries_LOWESS
logger.debug('Using LOWESS for screening')
else:
raise TypeError('Unknown screening type %s' % screening)
self.screening_crit = screening_crit
self.remove_noise = remove_noise
self.green_band = green_band
self.swir1_band = swir1_band
self.slope_test = slope_test
if self.slope_test is True:
self.slope_test = threshold
self.idx_slope = idx_slope
if dynamic_rmse:
self.get_rmse = self._get_dynamic_rmse
else:
self.get_rmse = self._get_model_rmse
@property
def record_template(self):
""" YATSM record template for features in X and series in Y
Record template will set `px` and `py` if defined as class attributes.
Otherwise `px` and `py` coordinates will default to 0.
Returns:
numpy.ndarray: NumPy structured array containing a template of a
YATSM record
"""
record_template = np.zeros(1, dtype=[
('start', 'i4'),
('end', 'i4'),
('break', 'i4'),
('coef', 'float32', (self.n_features, self.n_series)),
('rmse', 'float32', (self.n_series)),
('magnitude', 'float32', self.n_series),
('px', 'u2'),
('py', 'u2')
])
record_template['px'] = self.px
record_template['py'] = self.py
return record_template
# HELPER PROPERTIES
@property
def span_time(self):
""" Return time span (in days) between start and end of model """
return abs(self.dates[self.here] - self.dates[self.start])
@property
def span_index(self):
""" Return time span (in index) between start and end of model """
return (self.here - self.start)
@property
def running(self):
""" Determine if timeseries can run """
return self.here < len(self.dates)
@property
def can_monitor(self):
""" Determine if timeseries can monitor the future consecutive obs """
return self.here < len(self.dates) - self.consecutive - 1
# MAIN LOOP
[docs] def fit(self, X, Y, dates):
""" Fit timeseries model
Args:
X (numpy.ndarray): design matrix (number of observations x number
of features)
Y (numpy.ndarray): independent variable matrix (number of series x
number of observations)
dates (numpy.ndarray): ordinal dates for each observation in X/Y
Returns:
numpy.ndarray: NumPy structured array containing timeseries
model attribute information
"""
if len(dates) != X.shape[0] or len(dates) != Y.shape[1]:
raise ValueError('X/Y/dates must have same number of observations')
self.X = np.asarray(X, dtype=np.float64)
self.Y = np.asarray(Y, dtype=np.float64)
self.dates = dates
self.n_features = X.shape[1]
self.n_series = Y.shape[0]
# Setup test indices
if not np.any(np.asarray(self.test_indices)):
self.test_indices = np.arange(self.n_series)
# Setup minimum RMSE
if isinstance(self.min_rmse, (list, np.ndarray)):
self.min_rmse = np.asarray(self.min_rmse)
elif isinstance(self.min_rmse, (int, float)):
self.min_rmse = np.array([self.min_rmse] * self.n_series)
else:
self.min_rmse = np.array([sys.float_info.min] * self.n_series)
# Set or reset state variables
self.reset()
if len(dates) < self.here + self.consecutive:
raise TSLengthException('Not enough observations (n = %s)' %
len(dates))
self.n_record = 0
self.record = np.copy(self.record_template)
while self.running:
while not self.monitoring and self.can_monitor:
self.train()
self.here += 1
# Ensure all bands are fit in case we can't monitor
# First check if there are enough obs to estimate model
if self.span_index > self.n_features:
self._update_model()
while self.monitoring and self.can_monitor:
# Update model if required
self._update_model()
# Perform monitoring check
self.monitor()
# Iterate forward
self.here += 1
self.here += 1
# Update record for last model
self.record[self.n_record]['start'] = self.dates[self.start]
# Re-adjust end for consecutive, and for two ``self.here += 1`` calls
offset = 1 + (1 if self.monitoring else 0)
self.record[self.n_record]['end'] = self.dates[
self.here - self.consecutive - offset]
for i, m in enumerate(self.models):
self.record[self.n_record]['coef'][:, i] = m.coef
self.record[self.n_record]['rmse'][i] = m.rmse
# If we ended without being able to monitor again, delete last model
# since it will be empty
# TODO: fit this time period with median
if not self.monitoring:
self.record = self.record[:-1]
return self.record
[docs] def reset(self):
""" Reset state information required for model fittings
"""
# Location information
self.start = 0
self.here = self.min_obs
self._here = self.here
self.trained_date = 0
self.monitoring = False
# Populate prediction models
if len(self.models) == 0:
self.models = np.array([sklearn.clone(self.estimator) for
i in range(self.n_series)])
for m in self.models: # initialize additional attributes
m.rmse = 0.0
m.coef = np.zeros(self.X.shape[1])
# Training period test calculations
self.start_resid = np.zeros(len(self.test_indices))
self.end_resid = np.zeros(len(self.test_indices))
self.slope_resid = np.zeros(len(self.test_indices))
# Monitoring period calculations
self.predictions = np.zeros((len(self.test_indices), self.consecutive),
dtype=np.float64)
self.scores = np.zeros((len(self.test_indices), self.consecutive),
dtype=np.float64)
[docs] def train(self):
""" Train time series model if stability criteria are met
Stability criteria (Equation 5 in Zhu and Woodcock, 2014) include a
test on the change in reflectance over the training period (slope test)
and a test on the magnitude of the residuals for the first and last
observations in the training period. Training periods with large slopes
can indicate that a disturbance process is still in progress. Large
residuals on the first or last observations have high leverage on the
estimated regression and should be excluded from the training period.
1. Slope test:
.. math::
\\frac{1}{n}\sum\limits_{b\in B_{test}}\\frac{
\left|\\beta_{slope,b}(t_{end}-t_{start})\\right|}
{RMSE_b} > T_{crit}
2. First and last residual tests:
.. math::
\\frac{1}{n}\sum\limits_{b\in B_{test}}\\frac{
\left|\hat\\rho_{b,i=1} - \\rho_{b,i=1}\\right|}
{RMSE_b} > T_{crit}
\\frac{1}{n}\sum\limits_{b\in B_{test}}\\frac{
\left|\hat\\rho_{b,i=N} - \\rho_{b,i=N}\\right|}
{RMSE_b} > T_{crit}
"""
# Test if we can train yet
if self.span_time <= self.ndays or self.span_index < self.n_features:
logger.debug('Could not train - moving forward')
return
# Check if screening was OK
if not self.screen_timeseries():
return
# Test if we can still run after noise removal
if self.here >= self._X.shape[0]:
raise TSLengthException('Not enough observations to proceed '
'after noise removal')
# After noise removal, try to fit models
self.fit_models(self._X[self.start:self.here + 1, :],
self._Y[:, self.start:self.here + 1],
bands=self.test_indices)
# Ensure first and last points aren't unusual
for i, b in enumerate(self.test_indices):
m = self.models[b]
_rmse = max(self.min_rmse[b], m.rmse)
self.start_resid[i] = (
np.abs(self._Y[b, self.start] -
m.predict(self._X[self.start, :][None, :])) /
_rmse)
self.end_resid[i] = (
np.abs(self._Y[b, self.here] -
m.predict(self._X[self.here, :][None, :])) /
_rmse)
self.slope_resid[i] = (
np.abs(m.coef_[self.idx_slope] * (self.here - self.start)) /
_rmse)
test_start = np.linalg.norm(self.start_resid)
test_end = np.linalg.norm(self.end_resid)
test_slope = np.linalg.norm(self.slope_resid)
if (test_start > self.threshold or test_end > self.threshold or
(self.slope_test and test_slope > self.threshold)):
logger.debug('Training period unstable')
self.start += 1
self.here = self._here
return
self.X = self._X
self.Y = self._Y
self.dates = self._dates
logger.debug('Entering monitoring period')
self.monitoring = True
[docs] def monitor(self):
""" Monitor for changes in time series
The test criteria for CCDC can be represented as:
.. math::
\\sum_{i=0}^{\\color{red}{consec}}
I \\left(
\\sqrt{
\\sum_{b\in \\color{red}{B_{test}}}
\\left(
\\frac
{\hat\\rho_{b,i} - \\rho_{b,i}}
{{RMSE}_b^{\\color{red}{*}}}
\\right)
^2
} > \\color{red}{T_{crit}}
\\right)
> \\color{red}{consec}
where the symbols in red are model hyperparameters:
* :math:`\\color{red}{consec}`:
:paramref:`consecutive <.CCDCesque.consecutive>`
* :math:`\\color{red}{B_{test}}`:
:paramref:`test_indices <.CCDCesque.test_indices>`
* :math:`\\color{red}{T_{crit}}`:
:paramref:`threshold <.CCDCesque.threshold>`
* :math:`{RMSE}_b^{\\color{red}{*}}` depends on:
* :paramref:`dynamic_rmse <.CCDCesque.dynamic_rmse>`:
* True: :func:`~_get_dynamic_rmse` is used for RMSE
* False: :func:`~_get_model_rmse` is used for RMSE
* :paramref:`min_rmse <.CCDCesque.min_rmse>`
* If a `float` or `int` is given, override RMSE estimate
if estimate is smaller than
:paramref:`min_rmse <.CCDCesque.min_rmse>`
If :paramref:`remove_noise <.CCDCesque.remove_noise>` is `True`,
the first of ``consecutive`` observations will be removed if first
scaled residual is above ``threshold`` but not all ``consecutive``
scaled residuals exceed ``threshold``.
"""
_rmse = self.get_rmse()
for idx, model in enumerate(self.models[self.test_indices]):
self.predictions[idx, :] = model.predict(
self.X[self.here:self.here + self.consecutive, :])
_monitor_calc_scores(self.X, self.Y, self.here,
self.scores,
self.predictions, _rmse,
self.test_indices,
self.min_rmse)
# Check for scores above critical value
mag = np.linalg.norm(self.scores, axis=0)
if np.all(mag > self.threshold):
logger.debug('CHANGE DETECTED')
# Update record for last model
self.record[self.n_record]['start'] = self.dates[self.start]
self.record[self.n_record]['end'] = self.dates[self.here]
self.record[self.n_record]['break'] = self.dates[self.here + 1]
for i, m in enumerate(self.models):
self.record[self.n_record]['coef'][:, i] = m.coef
self.record[self.n_record]['rmse'][i] = m.rmse
# Record magnitude of difference for tested indices
self.record[self.n_record]['magnitude'][self.test_indices] = \
np.mean(self.scores, axis=1)
self.record = np.append(self.record, self.record_template)
self.n_record += 1
# Reset _X and _Y for re-training
self._X = self.X
self._Y = self.Y
self.start = self.here + 1
self.trained_date = 0
self.monitoring = False
elif mag[0] > self.threshold and self.remove_noise:
# Masking way of deleting is faster than `np.delete`
m = np.ones(self.X.shape[0], dtype=bool)
m[self.here] = False
self.X = self.X[m, :]
self.Y = self.Y[:, m]
self.dates = self.dates[m]
self.here -= 1
# MODEL FITTING UTILITIES
def _update_model(self):
# Only train if enough time has past
if (abs(self.dates[self.here] - self.trained_date) >
self.retrain_time):
logger.debug('Monitoring - retraining (%s days since last)' %
str(self.dates[self.here] - self.trained_date))
# Fit timeseries models
self.fit_models(self.X[self.start:self.here + 1, :],
self.Y[:, self.start:self.here + 1])
self.trained_date = self.dates[self.here]
# MULTITEMP SCREENING
def _screen_timeseries_LOWESS(self, span=None):
""" Screen entire dataset for noise before training using LOWESS
Args:
span (int): span for LOWESS
Returns:
bool: True if timeseries is screened and we can train, else False
"""
if not self.screened:
span = span or self.consecutive * 2 + 1
mask = smooth_mask(self.dates, self.Y, span,
crit=self.screening_crit,
green=self.green_band, swir1=self.swir1_band)
# Apply mask to X and Y
self.X = self.X[mask, :]
self.Y = self.Y[:, mask]
# Also apply to _X and _Y for training purposes
self._X = self.X
self._Y = self.Y
self.screened = True
return True
def _screen_timeseries_RLM(self):
""" Screen training period for noise with IRWLS RLM
Returns:
bool: True if timeseries is screened and we can train, else False
"""
# Multitemporal noise removal
mask = np.ones(self.X.shape[0], dtype=np.bool)
index = np.arange(self.start, self.here + self.consecutive,
dtype=np.uint16)
mask[index] = multitemp_mask(self.dates[index],
self.Y[:, index],
self.span_time / self.ndays,
crit=self.screening_crit,
green=self.green_band,
swir1=self.swir1_band)
# Check if there are enough observations for model with noise removed
_span_index = mask[index][:-self.consecutive].sum()
# Return if not enough observations
if _span_index < self.min_obs:
logger.debug(' multitemp masking - not enough obs')
return False
# There is enough observations in train period to fit - remove noise
self._X = self.X[mask, :]
self._Y = self.Y[:, mask]
self._dates = self.dates[mask]
# record our current position
# important for next iteration of noise removal
self._here = self.here
# Go forward after noise removal
self.here = self.start + _span_index - 1
if self.span_time < self.ndays:
logger.debug(' multitemp masking - not enough time')
self.here = self._here
return False
logger.debug('Updated "here"')
return True
# RMSE CALCULATION
[docs] def _get_model_rmse(self):
""" Return the normal RMSE of each fitted model
Returns:
numpy.ndarray: RMSE of each tested model
"""
return np.array([m.rmse for m in self.models])[self.test_indices]
[docs] def _get_dynamic_rmse(self):
""" Return the dynamic RMSE for each model
Dynamic RMSE refers to the Root Mean Squared Error calculated using
`self.min_obs` number of observations closest in day of year to the
observation `self.consecutive` steps into the future. Goal is to
reduce false-positives during seasonal transitions (high variance in
the signal) while decreasing omission during stable times of year.
Returns:
numpy.ndarray: dynamic RMSE of each tested model
"""
# Indices of closest observations based on DOY
i_doy = np.argsort(
np.mod(self.dates[self.start:self.here] -
self.dates[self.here + self.consecutive],
self.ndays))[:self.min_obs]
_rmse = np.zeros(len(self.test_indices), np.float32)
_X = self.X.take(i_doy, axis=0)
for i_b, b in enumerate(self.test_indices):
m = self.models[b]
_rmse[i_b] = rmse(self.Y[b, :].take(i_doy), m.predict(_X))
return _rmse