# -*- coding: utf-8 -*-
"""
.. module:: pyxpcm
:synopsis: Profile Classification Model
.. moduleauthor:: Guillaume Maze <gmaze@ifremer.fr>
Multi-variables classification, ie use of more than physical variable as PCM features
Created on 2019/09/27
@author: G. Maze (Ifremer/LOPS)
"""
import os
import sys
import numpy as np
import pandas as pd
import xarray as xr
import collections
import inspect
import dask
# import warnings
import time
from contextlib import contextmanager
from datetime import datetime
# Internal:
from .plot import _PlotMethods
from .stat import _StatMethods
from .utils import LogDataType, Vertical_Interpolator, NoTransform, StatisticsBackend, docstring
from . import io
# Scikit-learn useful methods:
from sklearn.base import BaseEstimator
from sklearn.pipeline import Pipeline
from sklearn.utils import validation
from sklearn.utils import assert_all_finite
from sklearn.exceptions import NotFittedError
####### Scikit-learn statistic backend:
# http://scikit-learn.org/stable/modules/mixture.html
from sklearn.mixture import GaussianMixture
class PCMFeatureError(Exception):
"""Exception raised when features not correct"""
class PCMClassError(Exception):
"""Exception raised when classes not correct"""
[docs]class pcm(object):
"""Profile Classification Model class constructor
Consume and return :mod:`xarray` objects
"""
[docs] def __init__(self,
K:int,
features:dict(),
scaling=1,
reduction=1, maxvar=15,
classif='gmm', covariance_type='full',
verb=False,
debug=False,
timeit=False, timeit_verb=False,
chunk_size='auto',
backend='sklearn'):
"""Create the PCM instance
Parameters
----------
K: int
The number of class, or cluster, in the classification model.
features: dict()
The vertical axis to use for each features.
eg: {'temperature':np.arange(-2000,0,1)}
scaling: int (default: 1)
Define the scaling method:
- 0: No scaling
- **1: Center on sample mean and scale by sample std**
- 2: Center on sample mean only
reduction: int (default: 1)
Define the dimensionality reduction method:
- 0: No reduction
- **1: Reduction using :class:`sklearn.decomposition.PCA`**
maxvar: float (default: 99.9)
Maximum feature variance to preserve in the reduced dataset using :class:`sklearn.decomposition.PCA`. In %.
classif: str (default: 'gmm')
Define the classification method.
The only method available as of now is a Gaussian Mixture Model.
See :class:`sklearn.mixture.GaussianMixture` for more details.
covariance_type: str (default: 'full')
Define the type of covariance matrix shape to be used in the default classifier GMM.
It can be ‘full’ (default), ‘tied’, ‘diag’ or ‘spherical’.
verb: boolean (default: False)
More verbose output
timeit: boolean (default: False)
Register time of operation for performance evaluation
timeit_verb: boolean (default: False)
Print time of operation during execution
chunk_size: 'auto' or int
Sampling chunk size, (array of features after pre-processing)
backend: str
Statistic library backend, 'sklearn' (default) or 'dask_ml'
"""
if K==0:
raise PCMClassError("Can't create a PCM with K=0")
if K is None:
raise PCMClassError("K must be defined to create a PMC")
if not bool(features):
raise PCMFeatureError("Can't create a PCM without features")
if scaling==0: with_scaler = 'none'; with_mean=False; with_std = False
elif scaling==1: with_scaler = 'normal'; with_mean=True; with_std = True
elif scaling==2: with_scaler = 'center'; with_mean=True; with_std = False
else: raise NameError('scaling must be 0, 1 or 2')
if reduction==0: with_reducer = False
elif reduction==1: with_reducer = True
else: raise NameError('reduction must be 0 or 1')
if classif=='gmm': with_classifier = 'gmm';
else: raise NameError("classifier must be 'gmm' (no other methods implemented at this time)")
#todo check validity of the dict of features
self._props = {'K': int(K),
'F': len(features),
'llh': None,
'COVARTYPE': covariance_type,
'with_scaler': with_scaler,
'with_reducer': with_reducer,
'with_classifier': with_classifier,
'maxvar': maxvar,
'features': collections.OrderedDict(features),
'chunk_size': chunk_size,
'backend': backend}
self._xmask = None # xarray mask for nd-array used at pre-processing steps
self._register = collections.OrderedDict() # Will register mutable instances of sub-modules like 'plot'
self._verb = verb #todo _verb is a property, should be set/get with a decorator
self._debug = debug
self._interpoler = collections.OrderedDict()
self._scaler = collections.OrderedDict()
self._scaler_props = collections.OrderedDict()
self._reducer = collections.OrderedDict()
self._homogeniser = collections.OrderedDict()
# Load estimators for a specific backend:
bck = StatisticsBackend(backend, scaler='StandardScaler', reducer='PCA')
for feature_name in features:
feature_axis = self._props['features'][feature_name]
if isinstance(feature_axis, xr.DataArray):
self._props['features'][feature_name] = feature_axis.values
# self._scaler[feature_name] = preprocessing.StandardScaler(with_mean=with_mean,
# with_std=with_std)
if 'none' not in self._props['with_scaler']:
self._scaler[feature_name] = bck.scaler(with_mean=with_mean, with_std=with_std)
else:
self._scaler[feature_name] = NoTransform()
self._scaler_props[feature_name] = {'units': '?'}
is_slice = np.all(feature_axis == None)
if not is_slice:
self._interpoler[feature_name] = Vertical_Interpolator(axis=feature_axis, debug=self._debug)
if np.prod(feature_axis.shape) == 1:
# Single level: no need to reduce
if self._debug: print('Single level, not need to reduce', np.prod(feature_axis.ndim))
self._reducer[feature_name] = NoTransform()
else:
# Multi-vertical-levels, set reducer:
if with_reducer:
self._reducer[feature_name] = bck.reducer(n_components=self._props['maxvar'],
svd_solver='full')
else:
self._reducer[feature_name] = NoTransform()
else:
self._interpoler[feature_name] = NoTransform()
self._reducer[feature_name] = NoTransform()
if self._debug: print("%s is single level, no need to reduce" % feature_name)
self._homogeniser[feature_name] = {'mean': 0, 'std': 1}
self._classifier = GaussianMixture(n_components=self._props['K'],
covariance_type=self._props['COVARTYPE'],
init_params='kmeans',
max_iter=1000,
tol=1e-6)
# Define the "context" to execute some functions inner code
# (useful for time benchmarking)
self._context = self.__empty_context # Default is empty, do nothing
self._context_args = dict()
if timeit:
self._context = self.__timeit_context
self._context_args = {'maxlevel': 3, 'verb':timeit_verb}
self._timeit = dict()
# Define statistics for the fit method:
self._fit_stats = dict({'datetime': None, 'n_samples_seen_': None, 'score': None, 'etime': None})
@contextmanager
def __timeit_context(self, name, opts=dict()):
default_opts = {'maxlevel': np.inf, 'verb':False}
for key in opts:
if key in default_opts:
default_opts[key] = opts[key]
level = len([i for i in range(len(name)) if name.startswith('.', i)])
if level <= default_opts['maxlevel']:
startTime = time.time()
yield
elapsedTime = time.time() - startTime
trailingspace = " " * level
trailingspace = " "
if default_opts['verb']:
# print('... time in {} {}: {} ms'.format(trailingspace, name, int(elapsedTime * 1000)))
print('{} {}: {} ms'.format(trailingspace, name, int(elapsedTime * 1000)))
if name in self._timeit:
self._timeit[name].append(elapsedTime * 1000)
else:
self._timeit[name] = list([elapsedTime*1000])
else:
yield
@contextmanager
def __empty_context(self, name, *args, **kargs):
yield
def __call__(self, **kwargs):
self.__init__(**kwargs)
def __iter__(self):
self.__i = 0
return self
def __next__(self):
if self.__i < self.K:
i = self.__i
self.__i += 1
return i
else:
raise StopIteration()
def __repr__(self):
return self.display(deep=self._verb)
[docs] def ravel(self, da, dim=None, feature_name=str):
""" Extract from N-d array a X(feature,sample) 2-d array and vertical dimension z
Parameters
----------
da: :class:`xarray.DataArray`
The DataArray to process
dim: str
Name of the vertical dimension in the input :class:`xarray.DataArray`
feature_name: str
Target PCM feature name for the input :class:`xarray.DataArray`
Returns
-------
X: :class:`xarray.DataArray`
A new DataArray with dimension ['n_sampling','n_features']
Note that data are always :class:`dask.array.Array`.
z: :class:`numpy.array`
The vertical axis of data
sampling_dims: dict()
Dictionary where keys are :class:`xarray.Dataset` variable names of features
and values are another dictionary with the list of sampling dimension in
``DIM_SAMPLING`` key and the name of the vertical axis in the ``DIM_VERTICAL`` key.
Examples
--------
This function is meant to be used internally only
__author__: gmaze@ifremer.fr
"""
# Is this a thick array or a slice ?
is_slice = np.all(self._props['features'][feature_name] == None)
# Load mask where all features are available for this PCM:
mask_stacked = self._xmask
if is_slice:
# No vertical dimension to use, simple stacking
sampling_dims = list(da.dims)
# Apply all-features mask:
X = da.stack({'sampling': sampling_dims})
X = X.where(mask_stacked == 1, drop=True).expand_dims('dummy').transpose()#.values
z = np.empty((1,))
else:
if not dim:
# Try to infer the vertical dimension name looking for the CF 'axis' attribute in all dimensions
dim_found = False
for this_dim in da.dims:
if ('axis' in da[this_dim].attrs) and (da[this_dim].attrs['axis'] == 'Z'):
dim = this_dim
dim_found = True
if not dim_found:
raise PCMFeatureError("You must specify a vertical dimension name: "\
"use argument 'dim' or "\
"specify DataSet dimension the attribute 'axis' to 'Z' (CF1.6)")
elif dim not in da.dims:
raise ValueError("Vertical dimension %s not found in this DataArray" % dim)
sampling_dims = list(da.dims)
sampling_dims.remove(dim)
X = da.stack({'sampling': sampling_dims}) #todo Improve performance for this operation !
# Apply all-features mask:
X = X.where(mask_stacked == 1, drop=True).transpose()
z = da[dim].values
X = X.chunk(chunks={'sampling': self._props['chunk_size']})
return X, z, sampling_dims
[docs] def unravel(self, ds, sampling_dims, X):
""" Create a DataArray from a numpy array and sampling dimensions """
# Load mask where all features are available for this PCM:
mask_stacked = self._xmask
#
coords = list()
size = list()
for dim in sampling_dims:
coords.append(ds[dim])
size.append(len(ds[dim]))
da = xr.DataArray(np.empty((size)), coords=coords)
da = da.stack({'sampling': sampling_dims})
da = da.where(mask_stacked == 1, drop=True).transpose()
da.values = X
da = da.unstack('sampling')
if (np.prod(da.shape) != mask_stacked.shape[0]):
if self._debug:
print("\tUnravelled data not matching mask dimension, re-indexing")
mask = mask_stacked.unstack()
da = da.reindex_like(mask)
return da
@property
def K(self):
"""Return the number of classes"""
return self._props['K']
@property
def F(self):
"""Return the number of features"""
return self._props['F']
@property
def features(self):
"""Return features definition dictionnary"""
return self._props['features']
@property
def plot(self):
"""Access plotting functions"""
# Create a mutable instance on 1st call so that later changes will be reflected in future calls
# https://stackoverflow.com/a/8140747
if 'plot' not in self._register:
self._register['plot'] = [_PlotMethods(self)]
return self._register['plot'][0]
@property
def stat(self):
"""Access statistics functions"""
return _StatMethods(self)
@property
def timeit(self):
""" Return a :class:`pandas.DataFrame` with Execution time of method called on this instance """
def get_multindex(times):
""" Create multi-index pandas """
# Get max levels:
dpt = list()
[dpt.append(len(key.split("."))) for key in times]
max_dpt = np.max(dpt)
# Read index:
levels_1 = list()
levels_2 = list()
levels_3 = list()
levels_4 = list()
if max_dpt == 1:
for key in times:
levels = key.split(".")
levels_1.append(levels[0])
return max_dpt, [levels_1]
elif max_dpt == 2:
for key in times:
levels = key.split(".")
if len(levels) == 1:
levels_1.append(levels[0])
levels_2.append('total')
if len(levels) == 2:
levels_1.append(levels[0])
levels_2.append(levels[1])
return max_dpt, [levels_1,levels_2]
elif max_dpt == 3:
for key in times:
levels = key.split(".")
# print(len(levels), levels)
if len(levels) == 1:
levels_1.append(levels[0])
levels_2.append('total')
levels_3.append('')
if len(levels) == 2:
levels_1.append(levels[0])
levels_2.append(levels[1])
levels_3.append('total')
if len(levels) == 3:
levels_1.append(levels[0])
levels_2.append(levels[1])
levels_3.append(levels[2])
return max_dpt, [levels_1,levels_2,levels_3]
elif max_dpt == 4:
for key in times:
levels = key.split(".")
if len(levels) == 1:
levels_1.append(levels[0])
levels_2.append('total')
levels_3.append('')
levels_4.append('')
if len(levels) == 2:
levels_1.append(levels[0])
levels_2.append(levels[1])
levels_3.append('total')
levels_4.append('')
if len(levels) == 3:
levels_1.append(levels[0])
levels_2.append(levels[1])
levels_3.append(levels[2])
levels_4.append('total')
if len(levels) == 4:
levels_1.append(levels[0])
levels_2.append(levels[1])
levels_3.append(levels[2])
levels_4.append(levels[3])
return max_dpt, [levels_1,levels_2,levels_3,levels_4]
times = self._timeit
max_dpt, arrays = get_multindex(times)
if max_dpt == 1:
index = pd.Index(arrays[0], names=['Method'])
df = pd.Series([np.sum(times[key]) for key in times], index=index)
# df = df.T
elif max_dpt == 2:
tuples = list(zip(*arrays))
index = pd.MultiIndex.from_tuples(tuples, names=['Method', 'Sub-method'])
df = pd.Series([np.sum(times[key]) for key in times], index=index)
df = df.unstack(0)
df = df.drop('total')
df = df.T
elif max_dpt == 3:
tuples = list(zip(*arrays))
index = pd.MultiIndex.from_tuples(tuples, names=['Method', 'Sub-method', 'Sub-sub-method'])
df = pd.Series([np.sum(times[key]) for key in times], index=index)
# df = df.unstack(0)
elif max_dpt == 4:
tuples = list(zip(*arrays))
index = pd.MultiIndex.from_tuples(tuples, names=['Method', 'Sub-method', 'Sub-sub-method',
'Sub-sub-sub-method'])
df = pd.Series([np.sum(times[key]) for key in times], index=index)
return df
@property
def backend(self):
"""Return the name of the statistic backend"""
return self._props['backend']
@property
def fitstats(self):
""" Estimator fit properties
The number of samples processed by the estimator
Will be reset on new calls to fit, but increments across partial_fit calls.
"""
return self._fit_stats
[docs] @docstring(io.to_netcdf.__doc__)
def to_netcdf(self, ncfile, **ka):
""" Save PCM to netcdf file
Parameters
----------
path : str
Path to file
"""
return io.to_netcdf(self, ncfile, **ka)
def display(self, deep=False):
"""Display detailed parameters of the PCM
This is not a get_params because it doesn't return a dictionary
Set Boolean option 'deep' to True for all properties display
"""
summary = [("<pcm '%s' (K: %i, F: %i)>")%(self._props['with_classifier'],
self._props['K'],
len(self._props['features']))]
# PCM core properties:
prop_info = ('Number of class: %i') % self._props['K']
summary.append(prop_info)
prop_info = ('Number of feature: %i') % len(self._props['features'])
summary.append(prop_info)
prop_info = ('Feature names: %s') % (repr(self._props['features'].keys()))
summary.append(prop_info)
# prop_info = ('Feature axis: [%s, ..., %s]') % (repr(self._props['features'][0]),
# repr(self._props['feature_axis'][-1]))
# summary.append(prop_info)
prop_info = ('Fitted: %r') % hasattr(self, 'fitted')
summary.append(prop_info)
# PCM workflow parameters:
for feature in self._props['features']:
prop_info = "Feature: '%s'" % feature
summary.append(prop_info)
summary.append("\t Interpoler: %s"%(type(self._interpoler[feature])))
# prop_info = ('\t Sample Scaling: %r') %
# summary.append(prop_info)
summary.append("\t Scaler: %r, %s"%(self._props['with_scaler'], type(self._scaler[feature])))
if (deep):
# summary.append("\t\t Scaler properties:")
d = self._scaler[feature].get_params(deep=deep)
for p in d: summary.append(("\t\t %s: %r")%(p,d[p]))
# prop_info = ('\t Dimensionality Reduction: %r') %
# summary.append(prop_info)
summary.append("\t Reducer: %r, %s"%(self._props['with_reducer'], type(self._reducer[feature])))
if (deep):
# summary.append("\t\t Reducer properties:")
d = self._reducer[feature].get_params(deep=deep)
for p in d: summary.append(("\t\t %s: %r")%(p,d[p]))
# return '\n'.join(summary)
# prop_info = ('Classification: %r') %
# summary.append(prop_info)
summary.append("Classifier: %r, %s"%(self._props['with_classifier'], type(self._classifier)))
#prop_info = ('GMM covariance type: %s') % self._props['COVARTYPE']
#summary.append(prop_info)
if (hasattr(self,'fitted')):
prop_info = ('\t log likelihood of the training set: %f') % self._props['llh']
summary.append(prop_info)
if (deep):
summary.append("\t Classifier properties:")
d = self._classifier.get_params(deep=deep)
for p in d: summary.append(("\t\t %s: %r")%(p,d[p]))
# Done
return '\n'.join(summary)
def preprocessing_this(self, da, dim=None, feature_name=str(), action='?'):
"""Pre-process data before anything
Possible pre-processing steps:
- interpolation,
- scaling,
- reduction
Parameters
----------
da: :class:`xarray.DataArray`
The DataArray to process
dim: str
Name of the vertical dimension in the input :class:`xarray.DataArray`
feature_name: str
Target PCM feature name for the input :class:`xarray.DataArray`
Returns
-------
X: np.array
Pre-processed feature, with dimensions (N_SAMPLE, N_FEATURES)
sampling_dims: list()
List of the input :class:`xarray.DataArray` dimensions stacked as sampling points
"""
this_context = str(action)+'.1-preprocess.2-feature_'+feature_name
with self._context(this_context + '.total', self._context_args):
# MAKE THE ND-ARRAY A 2D-ARRAY
with self._context(this_context + '.1-ravel', self._context_args):
X, z, sampling_dims = self.ravel(da, dim=dim, feature_name=feature_name)
if self._debug:
print("\t", "X RAVELED with success", str(LogDataType(X)))
# INTERPOLATION STEP:
with self._context(this_context + '.2-interp', self._context_args):
X = self._interpoler[feature_name].transform(X, z)
if self._debug:
if isinstance(self._interpoler[feature_name], NoTransform):
print("\t", "X INTERPOLATED with success (NoTransform)", str(LogDataType(X)))
else:
print("\t", "X INTERPOLATED with success", str(LogDataType(X)))
# print(X.values.flags['WRITEABLE'])
# After the interpolation step, we must not have nan in the 2d array:
assert_all_finite(X, allow_nan=False)
# FIT STEPS:
# We need to fit pre-processing methods in order to re-use them when
# predicting a new dataset
# SCALING:
with self._context(this_context+'.3-scale_fit', self._context_args):
if not hasattr(self, 'fitted'):
self._scaler[feature_name].fit(X.data)
if 'units' in da.attrs:
self._scaler_props[feature_name]['units'] = da.attrs['units']
with self._context(this_context + '.4-scale_transform', self._context_args):
try:
X.data = self._scaler[feature_name].transform(X.data, copy=False)
except ValueError:
if self._debug: print("\t\t Fail to scale.transform without copy, fall back on copy=True")
try:
X.data = self._scaler[feature_name].transform(X.data, copy=True)
except ValueError:
if self._debug: print("\t\t Fail to scale.transform with copy, fall back on input copy")
X.data = self._scaler[feature_name].transform(X.data.copy())
pass
except:
if self._debug: print(X.values.flags['WRITEABLE'])
raise
pass
except:
raise
if self._debug:
print("\t", "X SCALED with success)", str(LogDataType(X)))
# REDUCTION:
with self._context(this_context + '.5-reduce_fit', self._context_args):
if (not hasattr(self, 'fitted')) and (self._props['with_reducer']):
if self.backend == 'dask_ml':
# We have to convert any type of data array into a Dask array because
# dask_ml cannot handle anything else (!)
#todo Raise an issue on dask_ml github to ask why is this choice made
# Related issues:
# https://github.com/dask/dask-ml/issues/6
# https://github.com/dask/dask-ml/issues/541
# https://github.com/dask/dask-ml/issues/542
X.data = dask.array.asarray(X.data, chunks=X.shape)
if isinstance(X.data, dask.array.Array):
self._reducer[feature_name].fit(X.data)
else:
self._reducer[feature_name].fit(X)
with self._context(this_context + '.6-reduce_transform', self._context_args):
X = self._reducer[feature_name].transform(X.data) # Reduction, return np.array
# After reduction the new array is [ sampling, reduced_dim ]
X = xr.DataArray(X,
dims=['sampling', 'n_features'],
coords={'sampling': range(0, X.shape[0]),
'n_features': np.arange(0,X.shape[1])})
if self._debug:
print("\t", "X REDUCED with success)", str(LogDataType(X)))
# Output:
return X, sampling_dims
def preprocessing(self, ds, features=None, dim=None, action='?', mask=None):
""" Dataset pre-processing of feature(s)
Depending on pyXpcm set-up, pre-processing steps can be:
- interpolation,
- scaling,
- reduction
Parameters
----------
ds: :class:`xarray.Dataset`
The dataset to work with
features: dict()
Definitions of PCM features in the input :class:`xarray.Dataset`.
If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'.
dim: str
Name of the vertical dimension in the input :class:`xarray.Dataset`
Returns
-------
X: np.array
Pre-processed set of features, with dimensions (N_SAMPLE, N_FEATURES)
sampling_dims: list()
List of the input :class:`xarray.Dataset` dimensions stacked as sampling points
"""
this_context = str(action)+'.1-preprocess'
with self._context(this_context, self._context_args):
if self._debug:
print("> Start preprocessing for action '%s'" % action)
# How do we find feature variable in this dataset ?
features_dict = ds.pyxpcm.feature_dict(self, features=features)
# Determine mask where all features are defined for this PCM:
with self._context(this_context + '.1-mask', self._context_args):
if not mask:
mask = ds.pyxpcm.mask(self, features=features, dim=dim)
# Stack all-features mask:
mask = mask.stack({'sampling': list(mask.dims)})
self._xmask = mask
# Pre-process all features and build the X array
X = np.empty(())
Xlabel = list() # Construct a list of string labels for each feature dimension (useful for plots)
F = self.F # Nb of features
for feature_in_pcm in features_dict:
feature_in_ds = features_dict[feature_in_pcm]
if self._debug:
print( ("\n\t> Preprocessing xarray dataset '%s' as PCM feature '%s'")\
%(feature_in_ds, feature_in_pcm) )
if ('maxlevel' in self._context_args) and (self._context_args['maxlevel'] <= 2):
a = this_context + '.2-features'
else:
a = this_context
with self._context(a, self._context_args):
da = ds[feature_in_ds]
x, sampling_dims = self.preprocessing_this(da,
dim=dim,
feature_name=feature_in_pcm,
action=action)
xlabel = ["%s_%i"%(feature_in_pcm, i) for i in range(0, x.shape[1])]
if self._debug:
print("\t%s pre-processed with success, " % feature_in_pcm, str(LogDataType(x)))
with self._context(this_context + '.3-homogeniser', self._context_args):
# Store full array mean and std during fit:
if F>1:
# For more than 1 feature, we need to make them comparable,
# so we normalise each features by their global stats:
# FIT:
if (action == 'fit') or (action == 'fit_predict'):
self._homogeniser[feature_in_pcm]['mean'] = x.mean().values
self._homogeniser[feature_in_pcm]['std'] = x.std().values
#todo _homogeniser should be a proper standard scaler
# TRANSFORM:
x = (x-self._homogeniser[feature_in_pcm]['mean'])/\
self._homogeniser[feature_in_pcm]['std']
if self._debug and action == 'fit':
print(("\tHomogenisation for fit of %s") % (feature_in_pcm))
elif self._debug:
print(("\tHomogenisation of %s using fit data") % (feature_in_pcm))
elif self._debug:
print(("\tNo need for homogenisation of %s") % (feature_in_pcm))
if np.prod(X.shape) == 1:
X = x
Xlabel = xlabel
else:
X = np.append(X, x, axis=1)
[Xlabel.append(i) for i in xlabel]
with self._context(this_context + '.4-xarray', self._context_args):
self._xlabel = Xlabel
if self._debug:
print("\tFeatures array shape and type for xarray:",
X.shape, type(X), type(X.data))
X = xr.DataArray(X, dims=['n_samples', 'n_features'],
coords={'n_samples': range(0, X.shape[0]), 'n_features': Xlabel})
if self._debug:
print("> Preprocessing done, working with final X (%s) array of shape:" % type(X), X.shape,
" and sampling dimensions:", sampling_dims)
return X, sampling_dims
[docs] def fit(self, ds, features=None, dim=None):
"""Estimate PCM parameters
For a PCM, the fit method consists in the following operations:
- pre-processing
- interpolation to the ``feature_axis`` levels of the model
- scaling
- reduction
- estimate classifier parameters
Parameters
----------
ds: :class:`xarray.Dataset`
The dataset to work with
features: dict()
Definitions of PCM features in the input :class:`xarray.Dataset`.
If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'.
dim: str
Name of the vertical dimension in the input :class:`xarray.Dataset`
Returns
-------
self
"""
with self._context('fit', self._context_args) :
# PRE-PROCESSING:
X, sampling_dims = self.preprocessing(ds, features=features, dim=dim, action='fit')
# CLASSIFICATION-MODEL TRAINING:
with self._context('fit.fit', self._context_args):
self._classifier.fit(X)
with self._context('fit.score', self._context_args):
self._props['llh'] = self._classifier.score(X)
# Furthermore gather some information about the fit:
self._fit_stats['score'] = self._props['llh']
self._fit_stats['datetime'] = datetime.utcnow()
if 'n_samples_seen_' not in self._classifier.__dict__:
self._fit_stats['n_samples_seen_'] = X.shape[0]
else:
self._fit_stats['n_samples_seen_'] = self._classifier.n_samples_seen_
if 'n_iter_' in self._classifier.__dict__:
self._fit_stats['n_iter_'] = self._classifier.n_iter_
# Done:
self.fitted = True
return self
[docs] def predict(self, ds, features=None, dim=None, inplace=False, name='PCM_LABELS'):
"""Predict labels for profile samples
This method add these properties to the PCM object:
- ``llh``: The log likelihood of the model with regard to new data
Parameters
----------
ds: :class:`xarray.Dataset`
The dataset to work with
features: dict()
Definitions of PCM features in the input :class:`xarray.Dataset`.
If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'.
dim: str
Name of the vertical dimension in the input :class:`xarray.Dataset`
inplace: boolean, False by default
If False, return a :class:`xarray.DataArray` with predicted labels
If True, return the input :class:`xarray.Dataset` with labels added as a new :class:`xarray.DataArray`
name: str, default is 'PCM_LABELS'
Name of the :class:`xarray.DataArray` with labels
Returns
-------
:class:`xarray.DataArray`
Component labels (if option 'inplace' = False)
*or*
:class:`xarray.Dataset`
Input dataset with Component labels as a 'PCM_LABELS' new :class:`xarray.DataArray`
(if option 'inplace' = True)
"""
with self._context('predict', self._context_args):
# Check if the PCM is trained:
validation.check_is_fitted(self, 'fitted')
# PRE-PROCESSING:
X, sampling_dims = self.preprocessing(ds, features=features, dim=dim, action='predict')
# CLASSIFICATION PREDICTION:
with self._context('predict.predict', self._context_args):
labels = self._classifier.predict(X)
with self._context('predict.score', self._context_args):
llh = self._classifier.score(X)
# Create a xarray with labels output:
with self._context('predict.xarray', self._context_args):
da = self.unravel(ds, sampling_dims, labels).rename(name)
da.attrs['long_name'] = 'PCM labels'
da.attrs['units'] = ''
da.attrs['valid_min'] = 0
da.attrs['valid_max'] = self._props['K']-1
da.attrs['llh'] = llh
# Preserve attributes of coordinates:
for c in da.coords:
da[c].attrs = ds[c].attrs
# Add labels to the dataset:
if inplace:
return ds.pyxpcm.add(da)
else:
return da
[docs] def fit_predict(self, ds, features=None, dim=None, inplace=False, name='PCM_LABELS'):
"""Estimate PCM parameters and predict classes.
This method add these properties to the PCM object:
- ``llh``: The log likelihood of the model with regard to new data
Parameters
----------
ds: :class:`xarray.Dataset`
The dataset to work with
features: dict()
Definitions of PCM features in the input :class:`xarray.Dataset`.
If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'.
dim: str
Name of the vertical dimension in the input :class:`xarray.Dataset`
inplace: boolean, False by default
If False, return a :class:`xarray.DataArray` with predicted labels
If True, return the input :class:`xarray.Dataset` with labels added as a new :class:`xarray.DataArray`
name: string ('PCM_LABELS')
Name of the DataArray holding labels.
Returns
-------
:class:`xarray.DataArray`
Component labels (if option 'inplace' = False)
*or*
:class:`xarray.Dataset`
Input dataset with component labels as a 'PCM_LABELS' new :class:`xarray.DataArray` (if option 'inplace' = True)
"""
with self._context('fit_predict', self._context_args):
# PRE-PROCESSING:
X, sampling_dims = self.preprocessing(ds, features=features, dim=dim, action='fit_predict')
# CLASSIFICATION-MODEL TRAINING:
with self._context('fit_predict.fit', self._context_args):
self._classifier.fit(X)
with self._context('fit_predict.score', self._context_args):
self._props['llh'] = self._classifier.score(X)
# Furthermore gather some information about this fit:
self._fit_stats['score'] = self._props['llh']
if 'n_samples_seen_' not in self._classifier.__dict__:
self._fit_stats['n_samples_seen_'] = X.shape[0]
else:
self._fit_stats['n_samples_seen_'] = self._classifier.n_samples_seen_
if 'n_iter_' in self._classifier.__dict__:
self._fit_stats['n_iter_'] = self._classifier.n_iter_
# Done:
self.fitted = True
# CLASSIFICATION PREDICTION:
with self._context('fit_predict.predict', self._context_args):
labels = self._classifier.predict(X)
# Create a xarray with labels output:
with self._context('fit_predict.xarray', self._context_args):
da = self.unravel(ds, sampling_dims, labels).rename(name)
da.attrs['long_name'] = 'PCM labels'
da.attrs['units'] = ''
da.attrs['valid_min'] = 0
da.attrs['valid_max'] = self._props['K']-1
da.attrs['llh'] = self._props['llh']
# Preserve attributes of coordinates:
for c in da.coords:
da[c].attrs = ds[c].attrs
# Add labels to the dataset:
if inplace:
return ds.pyxpcm.add(da)
else:
return da
[docs] def predict_proba(self, ds, features=None, dim=None, inplace=False, name='PCM_POST', classdimname='pcm_class'):
"""Predict posterior probability of each components given the data
This method adds these properties to the PCM instance:
- ``llh``: The log likelihood of the model with regard to new data
Parameters
----------
ds: :class:`xarray.Dataset`
The dataset to work with
features: dict()
Definitions of PCM features in the input :class:`xarray.Dataset`.
If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'.
dim: str
Name of the vertical dimension in the input :class:`xarray.Dataset`
inplace: boolean, False by default
If False, return a :class:`xarray.DataArray` with predicted probabilities
If True, return the input :class:`xarray.Dataset` with probabilities added as a new :class:`xarray.DataArray`
name: str, default is 'PCM_POST'
Name of the DataArray with prediction probability (posteriors)
classdimname: str, default is 'pcm_class'
Name of the dimension holding classes
Returns
-------
:class:`xarray.DataArray`
Probability of each Gaussian (state) in the model given each
sample (if option 'inplace' = False)
*or*
:class:`xarray.Dataset`
Input dataset with Component Probability as a 'PCM_POST' new :class:`xarray.DataArray`
(if option 'inplace' = True)
"""
with self._context('predict_proba', self._context_args):
# Check if the PCM is trained:
validation.check_is_fitted(self, 'fitted')
# PRE-PROCESSING:
X, sampling_dims = self.preprocessing(ds, features=features, dim=dim, action='predict_proba')
# CLASSIFICATION PREDICTION:
with self._context('predict_proba.predict', self._context_args):
post_values = self._classifier.predict_proba(X)
with self._context('predict_proba.score', self._context_args):
self._props['llh'] = self._classifier.score(X)
# Create a xarray with posteriors:
with self._context('predict_proba.xarray', self._context_args):
P = list()
for k in range(self.K):
X = post_values[:, k]
x = self.unravel(ds, sampling_dims, X)
P.append(x)
da = xr.concat(P, dim=classdimname).rename(name)
da.attrs['long_name'] = 'PCM posteriors'
da.attrs['units'] = ''
da.attrs['valid_min'] = 0
da.attrs['valid_max'] = 1
da.attrs['llh'] = self._props['llh']
# Preserve attributes of coordinates:
for c in da.coords:
da[c].attrs = ds[c].attrs
# Add posteriors to the dataset:
if inplace:
return ds.pyxpcm.add(da)
else:
return da
[docs] def score(self, ds, features=None, dim=None):
"""Compute the per-sample average log-likelihood of the given data
Parameters
----------
ds: :class:`xarray.Dataset`
The dataset to work with
features: dict()
Definitions of PCM features in the input :class:`xarray.Dataset`.
If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'.
dim: str
Name of the vertical dimension in the input :class:`xarray.Dataset`
Returns
-------
log_likelihood: float
In the case of a GMM classifier, this is the Log likelihood of the Gaussian mixture given data
"""
with self._context('score', self._context_args):
# Check if the PCM is trained:
validation.check_is_fitted(self, 'fitted')
# PRE-PROCESSING:
X, sampling_dims = self.preprocessing(ds, features=features, action='score')
# COMPUTE THE PREDICTION SCORE:
with self._context('score.score', self._context_args):
llh = self._classifier.score(X)
return llh
[docs] def bic(self, ds, features=None, dim=None):
"""Compute Bayesian information criterion for the current model on the input dataset
Only for a GMM classifier
Parameters
----------
ds: :class:`xarray.Dataset`
The dataset to work with
features: dict()
Definitions of PCM features in the input :class:`xarray.Dataset`.
If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'.
dim: str
Name of the vertical dimension in the input :class:`xarray.Dataset`
Returns
-------
bic: float
The lower the better
"""
with self._context('bic', self._context_args):
# Check classifier:
if self._props['with_classifier'] != 'gmm':
raise Exception( ("BIC is only available for the 'gmm' classifier ('%s')")%\
(self._props['with_classifier']) )
def _n_parameters(_classifier):
"""Return the number of free parameters in the model. See sklearn code"""
_, n_features = _classifier.means_.shape
if _classifier.covariance_type == 'full':
cov_params = _classifier.n_components * n_features * (n_features + 1) / 2.
elif _classifier.covariance_type == 'diag':
cov_params = _classifier.n_components * n_features
elif _classifier.covariance_type == 'tied':
cov_params = n_features * (n_features + 1) / 2.
elif _classifier.covariance_type == 'spherical':
cov_params = _classifier.n_components
mean_params = n_features * _classifier.n_components
return int(cov_params + mean_params + _classifier.n_components - 1)
# Check if the PCM is trained:
validation.check_is_fitted(self, 'fitted')
# PRE-PROCESSING:
X, sampling_dims = self.preprocessing(ds, features=features, action='bic')
# COMPUTE THE log-likelihood:
with self._context('bic.score', self._context_args):
llh = self._classifier.score(X)
# COMPUTE BIC:
N_samples = X.shape[0]
bic = (-2 * llh * N_samples + _n_parameters(self._classifier) * np.log(N_samples))
return bic