Source code for pyxpcm.models

# -*- coding: utf-8 -*-
"""

.. module:: pyxpcm
   :synopsis: Profile Classification Model

.. moduleauthor:: Guillaume Maze <gmaze@ifremer.fr>

Multi-variables classification, ie use of more than physical variable as PCM features

Created on 2019/09/27
@author: G. Maze (Ifremer/LOPS)
"""

import os
import sys
import numpy as np
import pandas as pd
import xarray as xr
import collections
import inspect
import dask

import warnings
import time
from contextlib import contextmanager
from datetime import datetime

# Internal:
from .plot import _PlotMethods
from .stat import _StatMethods
from .utils import LogDataType, Vertical_Interpolator, NoTransform, StatisticsBackend, docstring
from . import io

# Scikit-learn useful methods:
from sklearn.base import BaseEstimator
from sklearn.pipeline import Pipeline
from sklearn.utils import validation
from sklearn.utils import assert_all_finite
from sklearn.exceptions import NotFittedError

####### Scikit-learn statistic backend:
# http://scikit-learn.org/stable/modules/mixture.html
from sklearn.mixture import GaussianMixture

class PCMFeatureError(Exception):
    """Exception raised when features not correct"""

class PCMClassError(Exception):
    """Exception raised when classes not correct"""

[docs]class pcm(object): """Profile Classification Model class constructor Consume and return :mod:`xarray` objects """
[docs] def __init__(self, K:int, features:dict(), scaling=1, reduction=1, maxvar=15, classif='gmm', covariance_type='full', verb=False, debug=False, timeit=False, timeit_verb=False, chunk_size='auto', backend='sklearn'): """Create the PCM instance Parameters ---------- K: int The number of class, or cluster, in the classification model. features: dict() The vertical axis to use for each features. eg: {'temperature':np.arange(-2000,0,1)} scaling: int (default: 1) Define the scaling method: - 0: No scaling - **1: Center on sample mean and scale by sample std** - 2: Center on sample mean only reduction: int (default: 1) Define the dimensionality reduction method: - 0: No reduction - **1: Reduction using :class:`sklearn.decomposition.PCA`** maxvar: float (default: 99.9) Maximum feature variance to preserve in the reduced dataset using :class:`sklearn.decomposition.PCA`. In %. classif: str (default: 'gmm') Define the classification method. The only method available as of now is a Gaussian Mixture Model. See :class:`sklearn.mixture.GaussianMixture` for more details. covariance_type: str (default: 'full') Define the type of covariance matrix shape to be used in the default classifier GMM. It can be ‘full’ (default), ‘tied’, ‘diag’ or ‘spherical’. verb: boolean (default: False) More verbose output timeit: boolean (default: False) Register time of operation for performance evaluation timeit_verb: boolean (default: False) Print time of operation during execution chunk_size: 'auto' or int Sampling chunk size, (array of features after pre-processing) backend: str Statistic library backend, 'sklearn' (default) or 'dask_ml' """ if K==0: raise PCMClassError("Can't create a PCM with K=0") if K is None: raise PCMClassError("K must be defined to create a PMC") if not bool(features): raise PCMFeatureError("Can't create a PCM without features") if scaling==0: with_scaler = 'none'; with_mean=False; with_std = False elif scaling==1: with_scaler = 'normal'; with_mean=True; with_std = True elif scaling==2: with_scaler = 'center'; with_mean=True; with_std = False else: raise NameError('scaling must be 0, 1 or 2') if reduction==0: with_reducer = False elif reduction==1: with_reducer = True else: raise NameError('reduction must be 0 or 1') if classif=='gmm': with_classifier = 'gmm'; else: raise NameError("classifier must be 'gmm' (no other methods implemented at this time)") #todo check validity of the dict of features self._props = {'K': np.int(K), 'F': len(features), 'llh': None, 'COVARTYPE': covariance_type, 'with_scaler': with_scaler, 'with_reducer': with_reducer, 'with_classifier': with_classifier, 'maxvar': maxvar, 'features': collections.OrderedDict(features), 'chunk_size': chunk_size, 'backend': backend, 'cmap': None} self._xmask = None # xarray mask for nd-array used at pre-processing steps self._verb = verb #todo _verb is a property, should be set/get with a decorator self._debug = debug self._interpoler = collections.OrderedDict() self._scaler = collections.OrderedDict() self._scaler_props = collections.OrderedDict() self._reducer = collections.OrderedDict() self._homogeniser = collections.OrderedDict() # Load estimators for a specific backend: bck = StatisticsBackend(backend, scaler='StandardScaler', reducer='PCA') for feature_name in features: feature_axis = self._props['features'][feature_name] if isinstance(feature_axis, xr.DataArray): self._props['features'][feature_name] = feature_axis.values # self._scaler[feature_name] = preprocessing.StandardScaler(with_mean=with_mean, # with_std=with_std) if 'none' not in self._props['with_scaler']: self._scaler[feature_name] = bck.scaler(with_mean=with_mean, with_std=with_std) else: self._scaler[feature_name] = NoTransform() self._scaler_props[feature_name] = {'units': '?'} is_slice = np.all(feature_axis == None) if not is_slice: self._interpoler[feature_name] = Vertical_Interpolator(axis=feature_axis, debug=self._debug) if np.prod(feature_axis.shape) == 1: # Single level: no need to reduce if self._debug: print('Single level, not need to reduce', np.prod(feature_axis.ndim)) self._reducer[feature_name] = NoTransform() else: # Multi-vertical-levels, set reducer: if with_reducer: self._reducer[feature_name] = bck.reducer(n_components=self._props['maxvar'], svd_solver='full') else: self._reducer[feature_name] = NoTransform() else: self._interpoler[feature_name] = NoTransform() self._reducer[feature_name] = NoTransform() if self._debug: print("%s is single level, no need to reduce" % feature_name) self._homogeniser[feature_name] = {'mean': 0, 'std': 1} self._classifier = GaussianMixture(n_components=self._props['K'], covariance_type=self._props['COVARTYPE'], init_params='kmeans', max_iter=1000, tol=1e-6) # Define the "context" to execute some functions inner code # (useful for time benchmarking) self._context = self.__empty_context # Default is empty, do nothing self._context_args = dict() if timeit: self._context = self.__timeit_context self._context_args = {'maxlevel': 3, 'verb':timeit_verb} self._timeit = dict() # Define statistics for the fit method: self._fit_stats = dict({'datetime': None, 'n_samples_seen_': None, 'score': None, 'etime': None})
@contextmanager def __timeit_context(self, name, opts=dict()): default_opts = {'maxlevel': np.inf, 'verb':False} for key in opts: if key in default_opts: default_opts[key] = opts[key] level = len([i for i in range(len(name)) if name.startswith('.', i)]) if level <= default_opts['maxlevel']: startTime = time.time() yield elapsedTime = time.time() - startTime trailingspace = " " * level trailingspace = " " if default_opts['verb']: # print('... time in {} {}: {} ms'.format(trailingspace, name, int(elapsedTime * 1000))) print('{} {}: {} ms'.format(trailingspace, name, int(elapsedTime * 1000))) if name in self._timeit: self._timeit[name].append(elapsedTime * 1000) else: self._timeit[name] = list([elapsedTime*1000]) else: yield @contextmanager def __empty_context(self, name, *args, **kargs): yield def __call__(self, **kwargs): self.__init__(**kwargs) def __iter__(self): self.__i = 0 return self def __next__(self): if self.__i < self.K: i = self.__i self.__i += 1 return i else: raise StopIteration() def __repr__(self): return self.display(deep=self._verb) # def xmerge(self, ds, da): # """ Add a new :class:`xarray.DataArray` to a :class:`xarray.Dataset` """ # # if da.name in ds.data_vars: # warnings.warn(("%s variable already in the dataset: overwriting") % (da.name)) # # # Add pyXpcm tracking clues: # da.attrs['comment'] = "Automatically added by pyXpcm" # # # # # vname = da.name # # self._obj[da.name] = da # ds = xr.merge([ds, da]) # return ds # # def __clean(self, ds): # """ Remove all variables created with pyXpcm front a :class:`xarray.Dataset` """ # # See add() method to identify these variables. # for vname in ds.data_vars: # if ("comment" in ds[vname].attrs) \ # and (ds[vname].attrs['comment'] == "Automatically added by pyXpcm"): # ds = ds.drop(vname) # return ds
[docs] def ravel(self, da, dim=None, feature_name=str): """ Extract from N-d array a X(feature,sample) 2-d array and vertical dimension z Parameters ---------- da: :class:`xarray.DataArray` The DataArray to process dim: str Name of the vertical dimension in the input :class:`xarray.DataArray` feature_name: str Target PCM feature name for the input :class:`xarray.DataArray` Returns ------- X: :class:`xarray.DataArray` A new DataArray with dimension ['n_sampling','n_features'] Note that data are always :class:`dask.array.Array`. z: :class:`numpy.array` The vertical axis of data sampling_dims: dict() Dictionary where keys are :class:`xarray.Dataset` variable names of features and values are another dictionary with the list of sampling dimension in ``DIM_SAMPLING`` key and the name of the vertical axis in the ``DIM_VERTICAL`` key. Examples -------- This function is meant to be used internally only __author__: gmaze@ifremer.fr """ # Is this a thick array or a slice ? is_slice = np.all(self._props['features'][feature_name] == None) # Load mask where all features are available for this PCM: mask_stacked = self._xmask if is_slice: # No vertical dimension to use, simple stacking sampling_dims = list(da.dims) # Apply all-features mask: X = da.stack({'sampling': sampling_dims}) X = X.where(mask_stacked == 1, drop=True).expand_dims('dummy').transpose()#.values z = np.empty((1,)) else: if not dim: # Try to infer the vertical dimension name looking for the CF 'axis' attribute in all dimensions dim_found = False for this_dim in da.dims: if ('axis' in da[this_dim].attrs) and (da[this_dim].attrs['axis'] == 'Z'): dim = this_dim dim_found = True if not dim_found: raise PCMFeatureError("You must specify a vertical dimension name: "\ "use argument 'dim' or "\ "specify DataSet dimension the attribute 'axis' to 'Z' (CF1.6)") elif dim not in da.dims: raise ValueError("Vertical dimension %s not found in this DataArray" % dim) sampling_dims = list(da.dims) sampling_dims.remove(dim) X = da.stack({'sampling': sampling_dims}) #todo Improve performance for this operation ! # Apply all-features mask: X = X.where(mask_stacked == 1, drop=True).transpose() z = da[dim].values X = X.chunk(chunks={'sampling': self._props['chunk_size']}) return X, z, sampling_dims
[docs] def unravel(self, ds, sampling_dims, X): """ Create a DataArray from a numpy array and sampling dimensions """ # Load mask where all features are available for this PCM: mask_stacked = self._xmask # coords = list() size = list() for dim in sampling_dims: coords.append(ds[dim]) size.append(len(ds[dim])) da = xr.DataArray(np.empty((size)), coords=coords) da = da.stack({'sampling': sampling_dims}) da = da.where(mask_stacked == 1, drop=True).transpose() da.values = X da = da.unstack('sampling') if (np.prod(da.shape) != mask_stacked.shape[0]): if self._debug: print("\tUnravelled data not matching mask dimension, re-indexing") mask = mask_stacked.unstack() da = da.reindex_like(mask) return da
@property def K(self): """Return the number of classes""" return self._props['K'] @property def F(self): """Return the number of features""" return self._props['F'] @property def features(self): """Return features definition dictionnary""" return self._props['features'] @property def plot(self): """Access plotting functions""" self._plt = _PlotMethods(self) return self._plt @property def stat(self): """Access statistics functions""" return _StatMethods(self) @property def timeit(self): """ Return a :class:`pandas.DataFrame` with Execution time of method called on this instance """ def get_multindex(times): """ Create multi-index pandas """ # Get max levels: dpt = list() [dpt.append(len(key.split("."))) for key in times] max_dpt = np.max(dpt) # Read index: levels_1 = list() levels_2 = list() levels_3 = list() levels_4 = list() if max_dpt == 1: for key in times: levels = key.split(".") levels_1.append(levels[0]) return max_dpt, [levels_1] elif max_dpt == 2: for key in times: levels = key.split(".") if len(levels) == 1: levels_1.append(levels[0]) levels_2.append('total') if len(levels) == 2: levels_1.append(levels[0]) levels_2.append(levels[1]) return max_dpt, [levels_1,levels_2] elif max_dpt == 3: for key in times: levels = key.split(".") # print(len(levels), levels) if len(levels) == 1: levels_1.append(levels[0]) levels_2.append('total') levels_3.append('') if len(levels) == 2: levels_1.append(levels[0]) levels_2.append(levels[1]) levels_3.append('total') if len(levels) == 3: levels_1.append(levels[0]) levels_2.append(levels[1]) levels_3.append(levels[2]) return max_dpt, [levels_1,levels_2,levels_3] elif max_dpt == 4: for key in times: levels = key.split(".") if len(levels) == 1: levels_1.append(levels[0]) levels_2.append('total') levels_3.append('') levels_4.append('') if len(levels) == 2: levels_1.append(levels[0]) levels_2.append(levels[1]) levels_3.append('total') levels_4.append('') if len(levels) == 3: levels_1.append(levels[0]) levels_2.append(levels[1]) levels_3.append(levels[2]) levels_4.append('total') if len(levels) == 4: levels_1.append(levels[0]) levels_2.append(levels[1]) levels_3.append(levels[2]) levels_4.append(levels[3]) return max_dpt, [levels_1,levels_2,levels_3,levels_4] times = self._timeit max_dpt, arrays = get_multindex(times) if max_dpt == 1: index = pd.Index(arrays[0], names=['Method']) df = pd.Series([np.sum(times[key]) for key in times], index=index) # df = df.T elif max_dpt == 2: tuples = list(zip(*arrays)) index = pd.MultiIndex.from_tuples(tuples, names=['Method', 'Sub-method']) df = pd.Series([np.sum(times[key]) for key in times], index=index) df = df.unstack(0) df = df.drop('total') df = df.T elif max_dpt == 3: tuples = list(zip(*arrays)) index = pd.MultiIndex.from_tuples(tuples, names=['Method', 'Sub-method', 'Sub-sub-method']) df = pd.Series([np.sum(times[key]) for key in times], index=index) # df = df.unstack(0) elif max_dpt == 4: tuples = list(zip(*arrays)) index = pd.MultiIndex.from_tuples(tuples, names=['Method', 'Sub-method', 'Sub-sub-method', 'Sub-sub-sub-method']) df = pd.Series([np.sum(times[key]) for key in times], index=index) return df @property def backend(self): """Return the name of the statistic backend""" return self._props['backend'] @property def fitstats(self): """ Estimator fit properties The number of samples processed by the estimator Will be reset on new calls to fit, but increments across partial_fit calls. """ return self._fit_stats
[docs] @docstring(io.to_netcdf.__doc__) def to_netcdf(self, ncfile, **ka): """ Save PCM to netcdf file Parameters ---------- path : str Path to file """ return io.to_netcdf(self, ncfile, **ka)
def display(self, deep=False): """Display detailed parameters of the PCM This is not a get_params because it doesn't return a dictionary Set Boolean option 'deep' to True for all properties display """ summary = [("<pcm '%s' (K: %i, F: %i)>")%(self._props['with_classifier'], self._props['K'], len(self._props['features']))] # PCM core properties: prop_info = ('Number of class: %i') % self._props['K'] summary.append(prop_info) prop_info = ('Number of feature: %i') % len(self._props['features']) summary.append(prop_info) prop_info = ('Feature names: %s') % (repr(self._props['features'].keys())) summary.append(prop_info) # prop_info = ('Feature axis: [%s, ..., %s]') % (repr(self._props['features'][0]), # repr(self._props['feature_axis'][-1])) # summary.append(prop_info) prop_info = ('Fitted: %r') % hasattr(self, 'fitted') summary.append(prop_info) # PCM workflow parameters: for feature in self._props['features']: prop_info = "Feature: '%s'" % feature summary.append(prop_info) summary.append("\t Interpoler: %s"%(type(self._interpoler[feature]))) # prop_info = ('\t Sample Scaling: %r') % # summary.append(prop_info) summary.append("\t Scaler: %r, %s"%(self._props['with_scaler'], type(self._scaler[feature]))) if (deep): # summary.append("\t\t Scaler properties:") d = self._scaler[feature].get_params(deep=deep) for p in d: summary.append(("\t\t %s: %r")%(p,d[p])) # prop_info = ('\t Dimensionality Reduction: %r') % # summary.append(prop_info) summary.append("\t Reducer: %r, %s"%(self._props['with_reducer'], type(self._reducer[feature]))) if (deep): # summary.append("\t\t Reducer properties:") d = self._reducer[feature].get_params(deep=deep) for p in d: summary.append(("\t\t %s: %r")%(p,d[p])) # return '\n'.join(summary) # prop_info = ('Classification: %r') % # summary.append(prop_info) summary.append("Classifier: %r, %s"%(self._props['with_classifier'], type(self._classifier))) #prop_info = ('GMM covariance type: %s') % self._props['COVARTYPE'] #summary.append(prop_info) if (hasattr(self,'fitted')): prop_info = ('\t log likelihood of the training set: %f') % self._props['llh'] summary.append(prop_info) if (deep): summary.append("\t Classifier properties:") d = self._classifier.get_params(deep=deep) for p in d: summary.append(("\t\t %s: %r")%(p,d[p])) # Done return '\n'.join(summary) def preprocessing_this(self, da, dim=None, feature_name=str(), action='?'): """Pre-process data before anything Possible pre-processing steps: - interpolation, - scaling, - reduction Parameters ---------- da: :class:`xarray.DataArray` The DataArray to process dim: str Name of the vertical dimension in the input :class:`xarray.DataArray` feature_name: str Target PCM feature name for the input :class:`xarray.DataArray` Returns ------- X: np.array Pre-processed feature, with dimensions (N_SAMPLE, N_FEATURES) sampling_dims: list() List of the input :class:`xarray.DataArray` dimensions stacked as sampling points """ this_context = str(action)+'.1-preprocess.2-feature_'+feature_name with self._context(this_context + '.total', self._context_args): # MAKE THE ND-ARRAY A 2D-ARRAY with self._context(this_context + '.1-ravel', self._context_args): X, z, sampling_dims = self.ravel(da, dim=dim, feature_name=feature_name) if self._debug: print("\t", "X RAVELED with success", str(LogDataType(X))) # INTERPOLATION STEP: with self._context(this_context + '.2-interp', self._context_args): X = self._interpoler[feature_name].transform(X, z) if self._debug: if isinstance(self._interpoler[feature_name], NoTransform): print("\t", "X INTERPOLATED with success (NoTransform)", str(LogDataType(X))) else: print("\t", "X INTERPOLATED with success", str(LogDataType(X))) # print(X.values.flags['WRITEABLE']) # After the interpolation step, we must not have nan in the 2d array: assert_all_finite(X, allow_nan=False) # FIT STEPS: # We need to fit pre-processing methods in order to re-use them when # predicting a new dataset # SCALING: with self._context(this_context+'.3-scale_fit', self._context_args): if not hasattr(self, 'fitted'): self._scaler[feature_name].fit(X.data) if 'units' in da.attrs: self._scaler_props[feature_name]['units'] = da.attrs['units'] with self._context(this_context + '.4-scale_transform', self._context_args): try: X.data = self._scaler[feature_name].transform(X.data, copy=False) except ValueError: if self._debug: print("\t\t Fail to scale.transform without copy, fall back on copy=True") try: X.data = self._scaler[feature_name].transform(X.data, copy=True) except ValueError: if self._debug: print("\t\t Fail to scale.transform with copy, fall back on input copy") X.data = self._scaler[feature_name].transform(X.data.copy()) pass except: if self._debug: print(X.values.flags['WRITEABLE']) raise pass except: raise if self._debug: print("\t", "X SCALED with success)", str(LogDataType(X))) # REDUCTION: with self._context(this_context + '.5-reduce_fit', self._context_args): if (not hasattr(self, 'fitted')) and (self._props['with_reducer']): if self.backend == 'dask_ml': # We have to convert any type of data array into a Dask array because # dask_ml cannot handle anything else (!) #todo Raise an issue on dask_ml github to ask why is this choice made # Related issues: # https://github.com/dask/dask-ml/issues/6 # https://github.com/dask/dask-ml/issues/541 # https://github.com/dask/dask-ml/issues/542 X.data = dask.array.asarray(X.data, chunks=X.shape) if isinstance(X.data, dask.array.Array): self._reducer[feature_name].fit(X.data) else: self._reducer[feature_name].fit(X) with self._context(this_context + '.6-reduce_transform', self._context_args): X = self._reducer[feature_name].transform(X.data) # Reduction, return np.array # After reduction the new array is [ sampling, reduced_dim ] X = xr.DataArray(X, dims=['sampling', 'n_features'], coords={'sampling': range(0, X.shape[0]), 'n_features': np.arange(0,X.shape[1])}) if self._debug: print("\t", "X REDUCED with success)", str(LogDataType(X))) # Output: return X, sampling_dims def preprocessing(self, ds, features=None, dim=None, action='?', mask=None): """ Dataset pre-processing of feature(s) Depending on pyXpcm set-up, pre-processing steps can be: - interpolation, - scaling, - reduction Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` Returns ------- X: np.array Pre-processed set of features, with dimensions (N_SAMPLE, N_FEATURES) sampling_dims: list() List of the input :class:`xarray.Dataset` dimensions stacked as sampling points """ this_context = str(action)+'.1-preprocess' with self._context(this_context, self._context_args): if self._debug: print("> Start preprocessing for action '%s'" % action) # How do we find feature variable in this dataset ? features_dict = ds.pyxpcm.feature_dict(self, features=features) # Determine mask where all features are defined for this PCM: with self._context(this_context + '.1-mask', self._context_args): if not mask: mask = ds.pyxpcm.mask(self, features=features, dim=dim) # Stack all-features mask: mask = mask.stack({'sampling': list(mask.dims)}) self._xmask = mask # Pre-process all features and build the X array X = np.empty(()) Xlabel = list() # Construct a list of string labels for each feature dimension (useful for plots) F = self.F # Nb of features for feature_in_pcm in features_dict: feature_in_ds = features_dict[feature_in_pcm] if self._debug: print( ("\n\t> Preprocessing xarray dataset '%s' as PCM feature '%s'")\ %(feature_in_ds, feature_in_pcm) ) if ('maxlevel' in self._context_args) and (self._context_args['maxlevel'] <= 2): a = this_context + '.2-features' else: a = this_context with self._context(a, self._context_args): da = ds[feature_in_ds] x, sampling_dims = self.preprocessing_this(da, dim=dim, feature_name=feature_in_pcm, action=action) xlabel = ["%s_%i"%(feature_in_pcm, i) for i in range(0, x.shape[1])] if self._debug: print("\t%s pre-processed with success, " % feature_in_pcm, str(LogDataType(x))) with self._context(this_context + '.3-homogeniser', self._context_args): # Store full array mean and std during fit: if F>1: # For more than 1 feature, we need to make them comparable, # so we normalise each features by their global stats: # FIT: if (action == 'fit') or (action == 'fit_predict'): self._homogeniser[feature_in_pcm]['mean'] = x.mean().values self._homogeniser[feature_in_pcm]['std'] = x.std().values #todo _homogeniser should be a proper standard scaler # TRANSFORM: x = (x-self._homogeniser[feature_in_pcm]['mean'])/\ self._homogeniser[feature_in_pcm]['std'] if self._debug and action == 'fit': print(("\tHomogenisation for fit of %s") % (feature_in_pcm)) elif self._debug: print(("\tHomogenisation of %s using fit data") % (feature_in_pcm)) elif self._debug: print(("\tNo need for homogenisation of %s") % (feature_in_pcm)) if np.prod(X.shape) == 1: X = x Xlabel = xlabel else: X = np.append(X, x, axis=1) [Xlabel.append(i) for i in xlabel] with self._context(this_context + '.4-xarray', self._context_args): self._xlabel = Xlabel if self._debug: print("\tFeatures array shape and type for xarray:", X.shape, type(X), type(X.data)) X = xr.DataArray(X, dims=['n_samples', 'n_features'], coords={'n_samples': range(0, X.shape[0]), 'n_features': Xlabel}) if self._debug: print("> Preprocessing done, working with final X (%s) array of shape:" % type(X), X.shape, " and sampling dimensions:", sampling_dims) return X, sampling_dims
[docs] def fit(self, ds, features=None, dim=None): """Estimate PCM parameters For a PCM, the fit method consists in the following operations: - pre-processing - interpolation to the ``feature_axis`` levels of the model - scaling - reduction - estimate classifier parameters Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` Returns ------- self """ with self._context('fit', self._context_args) : # PRE-PROCESSING: X, sampling_dims = self.preprocessing(ds, features=features, dim=dim, action='fit') # CLASSIFICATION-MODEL TRAINING: with self._context('fit.fit', self._context_args): self._classifier.fit(X) with self._context('fit.score', self._context_args): self._props['llh'] = self._classifier.score(X) # Furthermore gather some information about the fit: self._fit_stats['score'] = self._props['llh'] self._fit_stats['datetime'] = datetime.utcnow() if 'n_samples_seen_' not in self._classifier.__dict__: self._fit_stats['n_samples_seen_'] = X.shape[0] else: self._fit_stats['n_samples_seen_'] = self._classifier.n_samples_seen_ if 'n_iter_' in self._classifier.__dict__: self._fit_stats['n_iter_'] = self._classifier.n_iter_ # Done: self.fitted = True return self
[docs] def predict(self, ds, features=None, dim=None, inplace=False, name='PCM_LABELS'): """Predict labels for profile samples This method add these properties to the PCM object: - ``llh``: The log likelihood of the model with regard to new data Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` inplace: boolean, False by default If False, return a :class:`xarray.DataArray` with predicted labels If True, return the input :class:`xarray.Dataset` with labels added as a new :class:`xarray.DataArray` name: str, default is 'PCM_LABELS' Name of the :class:`xarray.DataArray` with labels Returns ------- :class:`xarray.DataArray` Component labels (if option 'inplace' = False) *or* :class:`xarray.Dataset` Input dataset with Component labels as a 'PCM_LABELS' new :class:`xarray.DataArray` (if option 'inplace' = True) """ with self._context('predict', self._context_args): # Check if the PCM is trained: validation.check_is_fitted(self, 'fitted') # PRE-PROCESSING: X, sampling_dims = self.preprocessing(ds, features=features, dim=dim, action='predict') # CLASSIFICATION PREDICTION: with self._context('predict.predict', self._context_args): labels = self._classifier.predict(X) with self._context('predict.score', self._context_args): llh = self._classifier.score(X) # Create a xarray with labels output: with self._context('predict.xarray', self._context_args): da = self.unravel(ds, sampling_dims, labels).rename(name) da.attrs['long_name'] = 'PCM labels' da.attrs['units'] = '' da.attrs['valid_min'] = 0 da.attrs['valid_max'] = self._props['K']-1 da.attrs['llh'] = llh # Add labels to the dataset: if inplace: return ds.pyxpcm.add(da) else: return da
[docs] def fit_predict(self, ds, features=None, dim=None, inplace=False, name='PCM_LABELS'): """Estimate PCM parameters and predict classes. This method add these properties to the PCM object: - ``llh``: The log likelihood of the model with regard to new data Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` inplace: boolean, False by default If False, return a :class:`xarray.DataArray` with predicted labels If True, return the input :class:`xarray.Dataset` with labels added as a new :class:`xarray.DataArray` name: string ('PCM_LABELS') Name of the DataArray holding labels. Returns ------- :class:`xarray.DataArray` Component labels (if option 'inplace' = False) *or* :class:`xarray.Dataset` Input dataset with component labels as a 'PCM_LABELS' new :class:`xarray.DataArray` (if option 'inplace' = True) """ with self._context('fit_predict', self._context_args): # PRE-PROCESSING: X, sampling_dims = self.preprocessing(ds, features=features, dim=dim, action='fit_predict') # CLASSIFICATION-MODEL TRAINING: with self._context('fit_predict.fit', self._context_args): self._classifier.fit(X) with self._context('fit_predict.score', self._context_args): self._props['llh'] = self._classifier.score(X) # Furthermore gather some information about this fit: self._fit_stats['score'] = self._props['llh'] if 'n_samples_seen_' not in self._classifier.__dict__: self._fit_stats['n_samples_seen_'] = X.shape[0] else: self._fit_stats['n_samples_seen_'] = self._classifier.n_samples_seen_ if 'n_iter_' in self._classifier.__dict__: self._fit_stats['n_iter_'] = self._classifier.n_iter_ # Done: self.fitted = True # CLASSIFICATION PREDICTION: with self._context('fit_predict.predict', self._context_args): labels = self._classifier.predict(X) # Create a xarray with labels output: with self._context('fit_predict.xarray', self._context_args): da = self.unravel(ds, sampling_dims, labels).rename(name) da.attrs['long_name'] = 'PCM labels' da.attrs['units'] = '' da.attrs['valid_min'] = 0 da.attrs['valid_max'] = self._props['K']-1 da.attrs['llh'] = self._props['llh'] # Add labels to the dataset: if inplace: return ds.pyxpcm.add(da) else: return da
[docs] def predict_proba(self, ds, features=None, dim=None, inplace=False, name='PCM_POST', classdimname='pcm_class'): """Predict posterior probability of each components given the data This method adds these properties to the PCM instance: - ``llh``: The log likelihood of the model with regard to new data Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` inplace: boolean, False by default If False, return a :class:`xarray.DataArray` with predicted probabilities If True, return the input :class:`xarray.Dataset` with probabilities added as a new :class:`xarray.DataArray` name: str, default is 'PCM_POST' Name of the DataArray with prediction probability (posteriors) classdimname: str, default is 'pcm_class' Name of the dimension holding classes Returns ------- :class:`xarray.DataArray` Probability of each Gaussian (state) in the model given each sample (if option 'inplace' = False) *or* :class:`xarray.Dataset` Input dataset with Component Probability as a 'PCM_POST' new :class:`xarray.DataArray` (if option 'inplace' = True) """ with self._context('predict_proba', self._context_args): # Check if the PCM is trained: validation.check_is_fitted(self, 'fitted') # PRE-PROCESSING: X, sampling_dims = self.preprocessing(ds, features=features, dim=dim, action='predict_proba') # CLASSIFICATION PREDICTION: with self._context('predict_proba.predict', self._context_args): post_values = self._classifier.predict_proba(X) with self._context('predict_proba.score', self._context_args): self._props['llh'] = self._classifier.score(X) # Create a xarray with posteriors: with self._context('predict_proba.xarray', self._context_args): P = list() for k in range(self.K): X = post_values[:, k] x = self.unravel(ds, sampling_dims, X) P.append(x) da = xr.concat(P, dim=classdimname).rename(name) da.attrs['long_name'] = 'PCM posteriors' da.attrs['units'] = '' da.attrs['valid_min'] = 0 da.attrs['valid_max'] = 1 da.attrs['llh'] = self._props['llh'] # Add posteriors to the dataset: if inplace: return ds.pyxpcm.add(da) else: return da
[docs] def score(self, ds, features=None, dim=None): """Compute the per-sample average log-likelihood of the given data Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` Returns ------- log_likelihood: float In the case of a GMM classifier, this is the Log likelihood of the Gaussian mixture given data """ with self._context('score', self._context_args): # Check if the PCM is trained: validation.check_is_fitted(self, 'fitted') # PRE-PROCESSING: X, sampling_dims = self.preprocessing(ds, features=features, action='score') # COMPUTE THE PREDICTION SCORE: with self._context('score.score', self._context_args): llh = self._classifier.score(X) return llh
[docs] def bic(self, ds, features=None, dim=None): """Compute Bayesian information criterion for the current model on the input dataset Only for a GMM classifier Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work with features: dict() Definitions of PCM features in the input :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim: str Name of the vertical dimension in the input :class:`xarray.Dataset` Returns ------- bic: float The lower the better """ with self._context('bic', self._context_args): # Check classifier: if self._props['with_classifier'] != 'gmm': raise Exception( ("BIC is only available for the 'gmm' classifier ('%s')")%\ (self._props['with_classifier']) ) def _n_parameters(_classifier): """Return the number of free parameters in the model. See sklearn code""" _, n_features = _classifier.means_.shape if _classifier.covariance_type == 'full': cov_params = _classifier.n_components * n_features * (n_features + 1) / 2. elif _classifier.covariance_type == 'diag': cov_params = _classifier.n_components * n_features elif _classifier.covariance_type == 'tied': cov_params = n_features * (n_features + 1) / 2. elif _classifier.covariance_type == 'spherical': cov_params = _classifier.n_components mean_params = n_features * _classifier.n_components return int(cov_params + mean_params + _classifier.n_components - 1) # Check if the PCM is trained: validation.check_is_fitted(self, 'fitted') # PRE-PROCESSING: X, sampling_dims = self.preprocessing(ds, features=features, action='bic') # COMPUTE THE log-likelihood: with self._context('bic.score', self._context_args): llh = self._classifier.score(X) # COMPUTE BIC: N_samples = X.shape[0] bic = (-2 * llh * N_samples + _n_parameters(self._classifier) * np.log(N_samples)) return bic