Source code for pyxpcm.models

# -*- coding: utf-8 -*-
"""

.. module:: pyxpcm
   :synopsis: Profile Classification Model

.. moduleauthor:: Guillaume Maze <gmaze@ifremer.fr>

Multi-variables classification, ie use of more than physical variable as PCM features

Created on 2019/09/27
@author: G. Maze (Ifremer/LOPS)
"""

import os
import sys
import numpy as np
import pandas as pd
import xarray as xr
import collections
import inspect
import dask

# import warnings
import time
from contextlib import contextmanager
from datetime import datetime

# Internal:
from .plot import _PlotMethods
from .stat import _StatMethods
from .utils import LogDataType, Vertical_Interpolator, NoTransform, StatisticsBackend, docstring
from . import io

# Scikit-learn useful methods:
from sklearn.base import BaseEstimator
from sklearn.pipeline import Pipeline
from sklearn.utils import validation
from sklearn.utils import assert_all_finite
from sklearn.exceptions import NotFittedError

####### Scikit-learn statistic backend:
# http://scikit-learn.org/stable/modules/mixture.html
from sklearn.mixture import GaussianMixture

class PCMFeatureError(Exception):
    """Exception raised when features not correct"""

class PCMClassError(Exception):
    """Exception raised when classes not correct"""

[docs]class pcm(object): """Profile Classification Model class constructor Consume and return :mod:`xarray` objects """
[docs] def __init__(self, K:int, features:dict(), scaling=1, reduction=1, maxvar=15, classif='gmm', covariance_type='full', verb=False, debug=False, timeit=False, timeit_verb=False, chunk_size='auto', backend='sklearn'): """Create the PCM instance Parameters ---------- K: int The number of class, or cluster, in the classification model. features: dict() The vertical axis to use for each features. eg: {'temperature':np.arange(-2000,0,1)} scaling: int (default: 1) Define the scaling method: - 0: No scaling - **1: Center on sample mean and scale by sample std** - 2: Center on sample mean only reduction: int (default: 1) Define the dimensionality reduction method: - 0: No reduction - **1: Reduction using :class:`sklearn.decomposition.PCA`** maxvar: float (default: 99.9) Maximum feature variance to preserve in the reduced dataset using :class:`sklearn.decomposition.PCA`. In %. classif: str (default: 'gmm') Define the classification method. The only method available as of now is a Gaussian Mixture Model. See :class:`sklearn.mixture.GaussianMixture` for more details. covariance_type: str (default: 'full') Define the type of covariance matrix shape to be used in the default classifier GMM. It can be ‘full’ (default), ‘tied’, ‘diag’ or ‘spherical’. verb: boolean (default: False) More verbose output timeit: boolean (default: False) Register time of operation for performance evaluation timeit_verb: boolean (default: False) Print time of operation during execution chunk_size: 'auto' or int Sampling chunk size, (array of features after pre-processing) backend: str Statistic library backend, 'sklearn' (default) or 'dask_ml' """ if K==0: raise PCMClassError("Can't create a PCM with K=0") if K is None: raise PCMClassError("K must be defined to create a PMC") if not bool(features): raise PCMFeatureError("Can't create a PCM without features") if scaling==0: with_scaler = 'none'; with_mean=False; with_std = False elif scaling==1: with_scaler = 'normal'; with_mean=True; with_std = True elif scaling==2: with_scaler = 'center'; with_mean=True; with_std = False else: raise NameError('scaling must be 0, 1 or 2') if reduction==0: with_reducer = False elif reduction==1: with_reducer = True else: raise NameError('reduction must be 0 or 1') if classif=='gmm': with_classifier = 'gmm'; else: raise NameError("classifier must be 'gmm' (no other methods implemented at this time)") #todo check validity of the dict of features self._props = {'K': int(K), 'F': len(features), 'llh': None, 'COVARTYPE': covariance_type, 'with_scaler': with_scaler, 'with_reducer': with_reducer, 'with_classifier': with_classifier, 'maxvar': maxvar, 'features': collections.OrderedDict(features), 'chunk_size': chunk_size, 'backend': backend} self._xmask = None # xarray mask for nd-array used at pre-processing steps self._register = collections.OrderedDict() # Will register mutable instances of sub-modules like 'plot' self._verb = verb #todo _verb is a property, should be set/get with a decorator self._debug = debug self._interpoler = collections.OrderedDict() self._scaler = collections.OrderedDict() self._scaler_props = collections.OrderedDict() self._reducer = collections.OrderedDict() self._homogeniser = collections.OrderedDict() # Load estimators for a specific backend: bck = StatisticsBackend(backend, scaler='StandardScaler', reducer='PCA') for feature_name in features: feature_axis = self._props['features'][feature_name] if isinstance(feature_axis, xr.DataArray): self._props['features'][feature_name] = feature_axis.values # self._scaler[feature_name] = preprocessing.StandardScaler(with_mean=with_mean, # with_std=with_std) if 'none' not in self._props['with_scaler']: self._scaler[feature_name] = bck.scaler(with_mean=with_mean, with_std=with_std) else: self._scaler[feature_name] = NoTransform() self._scaler_props[feature_name] = {'units': '?'} is_slice = np.all(feature_axis == None) if not is_slice: self._interpoler[feature_name] = Vertical_Interpolator(axis=feature_axis, debug=self._debug) if np.prod(feature_axis.shape) == 1: # Single level: no need to reduce if self._debug: print('Single level, not need to reduce', np.prod(feature_axis.ndim)) self._reducer[feature_name] = NoTransform() else: # Multi-vertical-levels, set reducer: if with_reducer: self._reducer[feature_name] = bck.reducer(n_components=self._props['maxvar'], svd_solver='full') else: self._reducer[feature_name] = NoTransform() else: self._interpoler[feature_name] = NoTransform() self._reducer[feature_name] = NoTransform() if self._debug: print("%s is single level, no need to reduce" % feature_name) self._homogeniser[feature_name] = {'mean': 0, 'std': 1} self._classifier = GaussianMixture(n_components=self._props['K'], covariance_type=self._props['COVARTYPE'], init_params='kmeans', max_iter=1000, tol=1e-6) # Define the "context" to execute some functions inner code # (useful for time benchmarking) self._context = self.__empty_context # Default is empty, do nothing self._context_args = dict() if timeit: self._context = self.__timeit_context self._context_args = {'maxlevel': 3, 'verb':timeit_verb} self._timeit = dict() # Define statistics for the fit method: self._fit_stats = dict({'datetime': None, 'n_samples_seen_': None, 'score': None, 'etime': None})
@contextmanager def __timeit_context(self, name, opts=dict()): default_opts = {'maxlevel': np.inf, 'verb':False} for key in opts: if key in default_opts: default_opts[key] = opts[key] level = len([i for i in range(len(name)) if name.startswith('.', i)]) if level <= default_opts['maxlevel']: startTime = time.time() yield elapsedTime = time.time() - startTime trailingspace = " " * level trailingspace = " " if default_opts['verb']: # print('... time in {} {}: {} ms'.format(trailingspace, name, int(elapsedTime * 1000))) print('{} {}: {} ms'.format(trailingspace, name, int(elapsedTime * 1000))) if name in self._timeit: self._timeit[name].append(elapsedTime * 1000) else: self._timeit[name] = list([elapsedTime*1000]) else: yield @contextmanager def __empty_context(self, name, *args, **kargs): yield def __call__(self, **kwargs): self.__init__(**kwargs) def __iter__(self): self.__i = 0 return self def __next__(self): if self.__i < self.K: i = self.__i self.__i += 1 return i else: raise StopIteration() def __repr__(self): return self.display(deep=self._verb)
[docs] def ravel(self, da, dim=None, feature_name=str): """ Extract from N-d array a X(feature,sample) 2-d array and vertical dimension z Parameters ---------- da: :class:`xarray.DataArray` The DataArray to process dim: str Name of the vertical dimension in the input :class:`xarray.DataArray` feature_name: str Target PCM feature name for the input :class:`xarray.DataArray` Returns ------- X: :class:`xarray.DataArray` A new DataArray with dimension ['n_sampling','n_features'] Note that data are always :class:`dask.array.Array`. z: :class:`numpy.array` The vertical axis of data sampling_dims: dict() Dictionary where keys are :class:`xarray.Dataset` variable names of features and values are another dictionary with the list of sampling dimension in ``DIM_SAMPLING`` key and the name of the vertical axis in the ``DIM_VERTICAL`` key. Examples -------- This function is meant to be used internally only __author__: gmaze@ifremer.fr """ # Is this a thick array or a slice ? is_slice = np.all(self._props['features'][feature_name] == None) # Load mask where all features are available for this PCM: mask_stacked = self._xmask if is_slice: # No vertical dimension to use, simple stacking sampling_dims = list(da.dims) # Apply all-features mask: X = da.stack({'sampling': sampling_dims}) X = X.where(mask_stacked == 1, drop=True).expand_dims('dummy').transpose()#.values z = np.empty((1,)) else: if not dim: # Try to infer the vertical dimension name looking for the CF 'axis' attribute in all dimensions dim_found = False for this_dim in da.dims: if ('axis' in da[this_dim].attrs) and (da[this_dim].attrs['axis'] == 'Z'): dim = this_dim dim_found = True if not dim_found: raise PCMFeatureError("You must specify a vertical dimension name: "\ "use argument 'dim' or "\ "specify DataSet dimension the attribute 'axis' to 'Z' (CF1.6)") elif dim not in da.dims: raise ValueError("Vertical dimension %s not found in this DataArray" % dim) sampling_dims = list(da.dims) sampling_dims.remove(dim) X = da.stack({'sampling': sampling_dims}) #todo Improve performance for this operation ! # Apply all-features mask: X = X.where(mask_stacked == 1, drop=True).transpose() z = da[dim].values X = X.chunk(chunks={'sampling': self._props['chunk_size']}) return X, z, sampling_dims
[docs] def unravel(self, ds, sampling_dims, X): """ Create a DataArray from a numpy array and sampling dimensions """ # Load mask where all features are available for this PCM: mask_stacked = self._xmask # coords = list() size = list() for dim in sampling_dims: coords.append(ds[dim]) size.append(len(ds[dim])) da = xr.DataArray(np.empty((size)), coords=coords) da = da.stack({'sampling': sampling_dims}) da = da.where(mask_stacked == 1, drop=True).transpose() da.values = X da = da.unstack('sampling') if (np.prod(da.shape) != mask_stacked.shape[0]): if self._debug: print("\tUnravelled data not matching mask dimension, re-indexing") mask = mask_stacked.unstack() da = da.reindex_like(mask) return da
@property def K(self): """Return the number of classes""" return self._props['K'] @property def F(self): """Return the number of features""" return self._props['F'] @property def features(self): """Return features definition dictionnary""" return self._props['features'] @property def plot(self): """Access plotting functions""" # Create a mutable instance on 1st call so that later changes will be reflected in future calls # https://stackoverflow.com/a/8140747 if 'plot' not in self._register: self._register['plot'] = [_PlotMethods(self)] return self._register['plot'][0] @property def stat(self): """Access statistics functions""" return _StatMethods(self) @property def timeit(self): """ Return a :class:`pandas.DataFrame` with Execution time of method called on this instance """ def get_multindex(times): """ Create multi-index pandas """ # Get max levels: dpt = list() [dpt.append(len(key.split("."))) for key in times] max_dpt = np.max(dpt) # Read index: levels_1 = list() levels_2 = list() levels_3 = list() levels_4 = list() if max_dpt == 1: for key in times: levels = key.split(".") levels_1.append(levels[0]) return max_dpt, [levels_1] elif max_dpt == 2: for key in times: levels = key.split(".") if len(levels) == 1: levels_1.append(levels[0]) levels_2.append('total') if len(levels) == 2: levels_1.append(levels[0]) levels_2.append(levels[1]) return max_dpt, [levels_1,levels_2] elif max_dpt == 3: for key in times: levels = key.split(".") # print(len(levels), levels) if len(levels) == 1: levels_1.append(levels[0]) levels_2.append('total') levels_3.append('') if len(levels) == 2: levels_1.append(levels[0]) levels_2.append(levels[1]) levels_3.append('total') if len(levels) == 3: levels_1.append(levels[0]) levels_2.append(levels[1]) levels_3.append(levels[2]) return max_dpt, [levels_1,levels_2,levels_3] elif max_dpt == 4: for key in times: levels = key.split(".") if len(levels) == 1: levels_1.append(levels[0]) levels_2.append('total') levels_3.append('') levels_4.append('') if len(levels) == 2: levels_1.append(levels[0]) levels_2.append(levels[1]) levels_3.append('total') levels_4.append('') if len(levels) == 3: levels_1.append(levels[0]) levels_2.append(levels[1]) levels_3.append(levels[2]) levels_4.append('total') if len(levels) == 4: levels_1.append(levels[0]) levels_2.append(levels[1]) levels_3.append(levels[2]) levels_4.append(levels[3]) return max_dpt, [levels_1,levels_2,levels_3,levels_4] times = self._timeit max_dpt, arrays = get_multindex(times) if max_dpt == 1: index = pd.Index(arrays[0], names=['Method']) df = pd.Series([np.sum(times[key]) for key in times], index=index) # df = df.T elif max_dpt == 2: tuples = list(zip(*arrays)) index = pd.MultiIndex.from_tuples(tuples, names=['Method', 'Sub-method']) df = pd.Series([np.sum(times[key]) for key in times], index=index) df = df.unstack(0) df = df.drop('total') df = df.T elif max_dpt == 3: tuples = list(zip(*arrays)) index = pd.MultiIndex.from_tuples(tuples, names=['Method', 'Sub-method', 'Sub-sub-method']) df = pd.Series([np.sum(times[key]) for key in times], index=index) # df = df.unstack(0) elif max_dpt == 4: tuples = list(zip(*arrays)) index = pd.MultiIndex.from_tuples(tuples, names=['Method', 'Sub-method', 'Sub-sub-method', 'Sub-sub-sub-method']) df = pd.Series([np.sum(times[key]) for key in times], index=index) return df @property def backend(self): """Return the name of the statistic backend""" return self._props['backend'] @property def fitstats(self): """ Estimator fit properties The number of samples processed by the estimator Will be reset on new calls to fit, but increments across partial_fit calls. """ return self._fit_stats
[docs] @docstring(io.to_netcdf.__doc__) def to_netcdf(self, ncfile, **ka): """ Save PCM to netcdf file Parameters ---------- path : str Path to file """ return io.to_netcdf(self, ncfile, **ka)
def display(self, deep=False): """Display detailed parameters of the PCM This is not a get_params because it doesn't return a dictionary Set Boolean option 'deep' to True for all properties display """ summary = [("<pcm '%s' (K: %i, F: %i)>")%(self._props['with_classifier'], self._props['K'], len(self._props['features']))] # PCM core properties: prop_info = ('Number of class: %i') % self._props['K'] summary.append(prop_info) prop_info = ('Number of feature: %i') % len(self._props['features']) summary.append(prop_info) prop_info = ('Feature names: %s') % (repr(self._props['features'].keys())) summary.append(prop_info) # prop_info = ('Feature axis: [%s, ..., %s]') % (repr(self._props['features'][0]), # repr(self._props['feature_axis'][-1])) # summary.append(prop_info) prop_info = ('Fitted: %r') % hasattr(self, 'fitted') summary.append(prop_info) # PCM workflow parameters: for feature in self._props['features']: prop_info = "Feature: '%s'" % feature summary.append(prop_info) summary.append("\t Interpoler: %s"%(type(self._interpoler[feature]))) # prop_info = ('\t Sample Scaling: %r') % # summary.append(prop_info) summary.append("\t Scaler: %r, %s"%(self._props['with_scaler'], type(self._scaler[feature]))) if (deep): # summary.append("\t\t Scaler properties:") d = self._scaler[feature].get_params(deep=deep) for p in d: summary.append(("\t\t %s: %r")%(p,d[p])) # prop_info = ('\t Dimensionality Reduction: %r') % # summary.append(prop_info) summary.append("\t Reducer: %r, %s"%(self._props['with_reducer'], type(self._reducer[feature]))) if (deep): # summary.append("\t\t Reducer properties:") d = self._reducer[feature].get_params(deep=deep) for p in d: summary.append(("\t\t %s: %r")%(p,d[p])) # return '\n'.join(summary) # prop_info = ('Classification: %r') % # summary.append(prop_info) summary.append("Classifier: %r, %s"%(self._props['with_classifier'], type(self._classifier))) #prop_info = ('GMM covariance type: %s') % self._props['COVARTYPE'] #summary.append(prop_info) if (hasattr(self,'fitted')): prop_info = ('\t log likelihood of the training set: %f') % self._props['llh'] summary.append(prop_info) if (deep): summary.append("\t Classifier properties:") d = self._classifier.get_params(deep=deep) for p in d: summary.append(("\t\t %s: %r")%(p,d[p])) # Done return '\n'.join(summary) def preprocessing_this(self, da, dim=None, feature_name=str(), action='?'): """Pre-process data before anything Possible pre-processing steps: - interpolation, - scaling, - reduction Parameters ---------- da: :class:`xarray.DataArray` The DataArray to process dim: str Name of the vertical dimension in the input :class:`xarray.DataArray` feature_name: str Target PCM feature name for the input :class:`xarray.DataArray` Returns ------- X: np.array Pre-processed feature, with dimensions (N_SAMPLE, N_FEATURES) sampling_dims: list() List of the input :class:`xarray.DataArray` dimensions stacked as sampling points """ this_context = str(action)+'.1-preprocess.2-feature_'+feature_name with self._context(this_context + '.total', self._context_args): # MAKE THE ND-ARRAY A 2D-ARRAY with self._context(this_context + '.1-ravel', self._context_args): X, z, sampling_dims = self.ravel(da, dim=dim, feature_name=feature_name) if self._debug: print("\t", "X RAVELED with success", str(LogDataType(X))) # INTERPOLATION STEP: with self._context(this_context + '.2-interp', self._context_args): X = self._interpoler[feature_name].transform(X, z) if self._debug: if isinstance(self._interpoler[feature_name], NoTransform): print("\t", "X INTERPOLATED with success (NoTransform)", str(LogDataType(X))) else: print("\t", "X INTERPOLATED with success", str(LogDataType(X))) # print(X.values.flags['WRITEABLE']) # After the interpolation step, we must not have nan in the 2d array: assert_all_finite(X, allow_nan=False) # FIT STEPS: # We need to fit pre-processing methods in order to re-use them when # predicting a new dataset # SCALING: with self._context(this_context+'.3-scale_fit', self._context_args): if not hasattr(self, 'fitted'): self._scaler[feature_name].fit(X.data) if 'units' in da.attrs: self._scaler_props[feature_name]['units'] = da.attrs['units'] with self._context(this_context + '.4-scale_transform', self._context_args): try: X.data = self._scaler[feature_name].transform(X.data, copy=False) except ValueError: if self._debug: print("\t\t Fail to scale.transform without copy, fall back on copy=True") try: X.data = self._scaler[feature_name].transform(X.data, copy=True) except ValueError: if self._debug: print("\t\t Fail to scale.transform with copy, fall back on input copy") X.data = self._scaler[feature_name].transform(X.data.copy()) pass except: if self._debug: print(X.values.flags['WRITEABLE']) raise pass except: raise if self._debug: print("\t", "X SCALED with success)", str(LogDataType(X))) # REDUCTION: with self._context(this_context + '.5-reduce_fit', self._context_args): if (not hasattr(self, 'fitted')) and (self._props['with_reducer']): if self.backend == 'dask_ml': # We have to convert any type of data array into a Dask array because # dask_ml cannot handle anything else (!) #todo Raise an issue on dask_ml github to ask why is this choice made # Related issues: # https://github.com/dask/dask-ml/issues/6 # https://github.com/dask/dask-ml/issues/541 # https://github.com/dask/dask-ml/issues/542 X.data = dask.array.asarray(X.data, chunks=X.shape) if isinstance(X.data, dask.array.Array): self._reducer[feature_name].fit(X.data) else: self._reducer[feature_name].fit(X) with self._context(this_context + '.6-reduce_transform', self._context_args): X = self._reducer[feature_name].transform(X.data) # Reduction, return np.array # After reduction the new array is [ sampling, reduced_dim ] X = xr.DataArray(X, dims=['sampling', 'n_features'], coords={'sampling': range(0, X.shape[0]), 'n_features': np.arange(0,X.shape[1])}) if self._debug: print("\t", "X REDUCED with success)", str(LogDataType(X))) # Output: return X, sampling_dims def preprocessing(self, ds, features=None, dim=None, action='?', mask=None): """ Dataset pre-processing of feature(s) Depending on pyXpcm set-up, pre-processing steps can be: - interpolation, - scaling, - reduction Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` Returns ------- X: np.array Pre-processed set of features, with dimensions (N_SAMPLE, N_FEATURES) sampling_dims: list() List of the input :class:`xarray.Dataset` dimensions stacked as sampling points """ this_context = str(action)+'.1-preprocess' with self._context(this_context, self._context_args): if self._debug: print("> Start preprocessing for action '%s'" % action) # How do we find feature variable in this dataset ? features_dict = ds.pyxpcm.feature_dict(self, features=features) # Determine mask where all features are defined for this PCM: with self._context(this_context + '.1-mask', self._context_args): if not mask: mask = ds.pyxpcm.mask(self, features=features, dim=dim) # Stack all-features mask: mask = mask.stack({'sampling': list(mask.dims)}) self._xmask = mask # Pre-process all features and build the X array X = np.empty(()) Xlabel = list() # Construct a list of string labels for each feature dimension (useful for plots) F = self.F # Nb of features for feature_in_pcm in features_dict: feature_in_ds = features_dict[feature_in_pcm] if self._debug: print( ("\n\t> Preprocessing xarray dataset '%s' as PCM feature '%s'")\ %(feature_in_ds, feature_in_pcm) ) if ('maxlevel' in self._context_args) and (self._context_args['maxlevel'] <= 2): a = this_context + '.2-features' else: a = this_context with self._context(a, self._context_args): da = ds[feature_in_ds] x, sampling_dims = self.preprocessing_this(da, dim=dim, feature_name=feature_in_pcm, action=action) xlabel = ["%s_%i"%(feature_in_pcm, i) for i in range(0, x.shape[1])] if self._debug: print("\t%s pre-processed with success, " % feature_in_pcm, str(LogDataType(x))) with self._context(this_context + '.3-homogeniser', self._context_args): # Store full array mean and std during fit: if F>1: # For more than 1 feature, we need to make them comparable, # so we normalise each features by their global stats: # FIT: if (action == 'fit') or (action == 'fit_predict'): self._homogeniser[feature_in_pcm]['mean'] = x.mean().values self._homogeniser[feature_in_pcm]['std'] = x.std().values #todo _homogeniser should be a proper standard scaler # TRANSFORM: x = (x-self._homogeniser[feature_in_pcm]['mean'])/\ self._homogeniser[feature_in_pcm]['std'] if self._debug and action == 'fit': print(("\tHomogenisation for fit of %s") % (feature_in_pcm)) elif self._debug: print(("\tHomogenisation of %s using fit data") % (feature_in_pcm)) elif self._debug: print(("\tNo need for homogenisation of %s") % (feature_in_pcm)) if np.prod(X.shape) == 1: X = x Xlabel = xlabel else: X = np.append(X, x, axis=1) [Xlabel.append(i) for i in xlabel] with self._context(this_context + '.4-xarray', self._context_args): self._xlabel = Xlabel if self._debug: print("\tFeatures array shape and type for xarray:", X.shape, type(X), type(X.data)) X = xr.DataArray(X, dims=['n_samples', 'n_features'], coords={'n_samples': range(0, X.shape[0]), 'n_features': Xlabel}) if self._debug: print("> Preprocessing done, working with final X (%s) array of shape:" % type(X), X.shape, " and sampling dimensions:", sampling_dims) return X, sampling_dims
[docs] def fit(self, ds, features=None, dim=None): """Estimate PCM parameters For a PCM, the fit method consists in the following operations: - pre-processing - interpolation to the ``feature_axis`` levels of the model - scaling - reduction - estimate classifier parameters Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` Returns ------- self """ with self._context('fit', self._context_args) : # PRE-PROCESSING: X, sampling_dims = self.preprocessing(ds, features=features, dim=dim, action='fit') # CLASSIFICATION-MODEL TRAINING: with self._context('fit.fit', self._context_args): self._classifier.fit(X) with self._context('fit.score', self._context_args): self._props['llh'] = self._classifier.score(X) # Furthermore gather some information about the fit: self._fit_stats['score'] = self._props['llh'] self._fit_stats['datetime'] = datetime.utcnow() if 'n_samples_seen_' not in self._classifier.__dict__: self._fit_stats['n_samples_seen_'] = X.shape[0] else: self._fit_stats['n_samples_seen_'] = self._classifier.n_samples_seen_ if 'n_iter_' in self._classifier.__dict__: self._fit_stats['n_iter_'] = self._classifier.n_iter_ # Done: self.fitted = True return self
[docs] def predict(self, ds, features=None, dim=None, inplace=False, name='PCM_LABELS'): """Predict labels for profile samples This method add these properties to the PCM object: - ``llh``: The log likelihood of the model with regard to new data Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` inplace: boolean, False by default If False, return a :class:`xarray.DataArray` with predicted labels If True, return the input :class:`xarray.Dataset` with labels added as a new :class:`xarray.DataArray` name: str, default is 'PCM_LABELS' Name of the :class:`xarray.DataArray` with labels Returns ------- :class:`xarray.DataArray` Component labels (if option 'inplace' = False) *or* :class:`xarray.Dataset` Input dataset with Component labels as a 'PCM_LABELS' new :class:`xarray.DataArray` (if option 'inplace' = True) """ with self._context('predict', self._context_args): # Check if the PCM is trained: validation.check_is_fitted(self, 'fitted') # PRE-PROCESSING: X, sampling_dims = self.preprocessing(ds, features=features, dim=dim, action='predict') # CLASSIFICATION PREDICTION: with self._context('predict.predict', self._context_args): labels = self._classifier.predict(X) with self._context('predict.score', self._context_args): llh = self._classifier.score(X) # Create a xarray with labels output: with self._context('predict.xarray', self._context_args): da = self.unravel(ds, sampling_dims, labels).rename(name) da.attrs['long_name'] = 'PCM labels' da.attrs['units'] = '' da.attrs['valid_min'] = 0 da.attrs['valid_max'] = self._props['K']-1 da.attrs['llh'] = llh # Preserve attributes of coordinates: for c in da.coords: da[c].attrs = ds[c].attrs # Add labels to the dataset: if inplace: return ds.pyxpcm.add(da) else: return da
[docs] def fit_predict(self, ds, features=None, dim=None, inplace=False, name='PCM_LABELS'): """Estimate PCM parameters and predict classes. This method add these properties to the PCM object: - ``llh``: The log likelihood of the model with regard to new data Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` inplace: boolean, False by default If False, return a :class:`xarray.DataArray` with predicted labels If True, return the input :class:`xarray.Dataset` with labels added as a new :class:`xarray.DataArray` name: string ('PCM_LABELS') Name of the DataArray holding labels. Returns ------- :class:`xarray.DataArray` Component labels (if option 'inplace' = False) *or* :class:`xarray.Dataset` Input dataset with component labels as a 'PCM_LABELS' new :class:`xarray.DataArray` (if option 'inplace' = True) """ with self._context('fit_predict', self._context_args): # PRE-PROCESSING: X, sampling_dims = self.preprocessing(ds, features=features, dim=dim, action='fit_predict') # CLASSIFICATION-MODEL TRAINING: with self._context('fit_predict.fit', self._context_args): self._classifier.fit(X) with self._context('fit_predict.score', self._context_args): self._props['llh'] = self._classifier.score(X) # Furthermore gather some information about this fit: self._fit_stats['score'] = self._props['llh'] if 'n_samples_seen_' not in self._classifier.__dict__: self._fit_stats['n_samples_seen_'] = X.shape[0] else: self._fit_stats['n_samples_seen_'] = self._classifier.n_samples_seen_ if 'n_iter_' in self._classifier.__dict__: self._fit_stats['n_iter_'] = self._classifier.n_iter_ # Done: self.fitted = True # CLASSIFICATION PREDICTION: with self._context('fit_predict.predict', self._context_args): labels = self._classifier.predict(X) # Create a xarray with labels output: with self._context('fit_predict.xarray', self._context_args): da = self.unravel(ds, sampling_dims, labels).rename(name) da.attrs['long_name'] = 'PCM labels' da.attrs['units'] = '' da.attrs['valid_min'] = 0 da.attrs['valid_max'] = self._props['K']-1 da.attrs['llh'] = self._props['llh'] # Preserve attributes of coordinates: for c in da.coords: da[c].attrs = ds[c].attrs # Add labels to the dataset: if inplace: return ds.pyxpcm.add(da) else: return da
[docs] def predict_proba(self, ds, features=None, dim=None, inplace=False, name='PCM_POST', classdimname='pcm_class'): """Predict posterior probability of each components given the data This method adds these properties to the PCM instance: - ``llh``: The log likelihood of the model with regard to new data Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` inplace: boolean, False by default If False, return a :class:`xarray.DataArray` with predicted probabilities If True, return the input :class:`xarray.Dataset` with probabilities added as a new :class:`xarray.DataArray` name: str, default is 'PCM_POST' Name of the DataArray with prediction probability (posteriors) classdimname: str, default is 'pcm_class' Name of the dimension holding classes Returns ------- :class:`xarray.DataArray` Probability of each Gaussian (state) in the model given each sample (if option 'inplace' = False) *or* :class:`xarray.Dataset` Input dataset with Component Probability as a 'PCM_POST' new :class:`xarray.DataArray` (if option 'inplace' = True) """ with self._context('predict_proba', self._context_args): # Check if the PCM is trained: validation.check_is_fitted(self, 'fitted') # PRE-PROCESSING: X, sampling_dims = self.preprocessing(ds, features=features, dim=dim, action='predict_proba') # CLASSIFICATION PREDICTION: with self._context('predict_proba.predict', self._context_args): post_values = self._classifier.predict_proba(X) with self._context('predict_proba.score', self._context_args): self._props['llh'] = self._classifier.score(X) # Create a xarray with posteriors: with self._context('predict_proba.xarray', self._context_args): P = list() for k in range(self.K): X = post_values[:, k] x = self.unravel(ds, sampling_dims, X) P.append(x) da = xr.concat(P, dim=classdimname).rename(name) da.attrs['long_name'] = 'PCM posteriors' da.attrs['units'] = '' da.attrs['valid_min'] = 0 da.attrs['valid_max'] = 1 da.attrs['llh'] = self._props['llh'] # Preserve attributes of coordinates: for c in da.coords: da[c].attrs = ds[c].attrs # Add posteriors to the dataset: if inplace: return ds.pyxpcm.add(da) else: return da
[docs] def score(self, ds, features=None, dim=None): """Compute the per-sample average log-likelihood of the given data Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` Returns ------- log_likelihood: float In the case of a GMM classifier, this is the Log likelihood of the Gaussian mixture given data """ with self._context('score', self._context_args): # Check if the PCM is trained: validation.check_is_fitted(self, 'fitted') # PRE-PROCESSING: X, sampling_dims = self.preprocessing(ds, features=features, action='score') # COMPUTE THE PREDICTION SCORE: with self._context('score.score', self._context_args): llh = self._classifier.score(X) return llh
[docs] def bic(self, ds, features=None, dim=None): """Compute Bayesian information criterion for the current model on the input dataset Only for a GMM classifier Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` Returns ------- bic: float The lower the better """ with self._context('bic', self._context_args): # Check classifier: if self._props['with_classifier'] != 'gmm': raise Exception( ("BIC is only available for the 'gmm' classifier ('%s')")%\ (self._props['with_classifier']) ) def _n_parameters(_classifier): """Return the number of free parameters in the model. See sklearn code""" _, n_features = _classifier.means_.shape if _classifier.covariance_type == 'full': cov_params = _classifier.n_components * n_features * (n_features + 1) / 2. elif _classifier.covariance_type == 'diag': cov_params = _classifier.n_components * n_features elif _classifier.covariance_type == 'tied': cov_params = n_features * (n_features + 1) / 2. elif _classifier.covariance_type == 'spherical': cov_params = _classifier.n_components mean_params = n_features * _classifier.n_components return int(cov_params + mean_params + _classifier.n_components - 1) # Check if the PCM is trained: validation.check_is_fitted(self, 'fitted') # PRE-PROCESSING: X, sampling_dims = self.preprocessing(ds, features=features, action='bic') # COMPUTE THE log-likelihood: with self._context('bic.score', self._context_args): llh = self._classifier.score(X) # COMPUTE BIC: N_samples = X.shape[0] bic = (-2 * llh * N_samples + _n_parameters(self._classifier) * np.log(N_samples)) return bic