Source code for pyxpcm.io

#!/bin/env python
# -*coding: UTF-8 -*-
#
# m = pyxpcm.load_netcdf(<path>)
# m.to_netcdf(<path>)
#
# Created by gmaze on 2019-10-15
__author__ = 'gmaze@ifremer.fr'

import os
import sys
import warnings
import numpy as np
import xarray as xr
import pickle
from datetime import datetime
import errno
from sklearn.utils import validation
from sklearn.mixture import GaussianMixture
from . import models
from . import __version__

# Define variables for netcdf load/save:
__software_name__ = 'Profile Classification Model - pyXpcm library'
__format_version__ = 2.0
# Version 1.0 was the version used by the Matlab library, limited to a single feature clustering.
# Instead of converting, we make sure to be able to load version 1.0

def _TransformerName(obj):
    return str(type(obj)).split('>')[0].split('.')[-1].split("'")[0]

def _load(file_path="pcm.nc"):
    """ Load a PCM file"""
    filename, file_extension = os.path.splitext(file_path)
    if file_extension == '.nc':
        m = load_netcdf(file_path)
    else:
        with open(file_path, "rb") as f:
            m = pickle.load(f)
    return m

def _save(m, file_path="pcm.nc"):
    """ Save a PCM to file """
    filename, file_extension = os.path.splitext(file_path)
    if file_extension == '.nc':
        to_netcdf(m, file_path)
    else:
        with open(file_path, "wb") as f:
            pickle.dump(m, f)

def _load_netcdf_format2(ncfile):
    """ Load a PCM model from a netcdf file format 2.0

        Parameters
        ----------
        ncfile : str
            File name from which to load a PCM.


    """

    pcm2cdf = dict()
    pcm2cdf['global'] = xr.open_dataset(ncfile, group='/')

    if pcm2cdf['global'].attrs['software'] != __software_name__:
        raise ValueError("Can't loading netcdf not created with this software.\n" +
                      pcm2cdf['global'].attrs['software'])
    if pcm2cdf['global'].attrs['format_version'] != __format_version__:
        raise ValueError("Incompatible format version " + pcm2cdf['global'].attrs['format_version'])

    for feature in pcm2cdf['global']['feature'].values:
        pcm2cdf[feature] = xr.open_dataset(ncfile, group=("feature_%s" % feature))
    pcm2cdf['classifier'] = xr.open_dataset(ncfile, group='classifier')

    # Create a new pcm instance:
    K = pcm2cdf['global']['K'].shape[0]
    scal = {'none': 0, 'normal': 1, 'center': 2}
    scaling = scal[pcm2cdf['global'].attrs['scaler']]
    reduction = eval(str(pcm2cdf['global'].attrs['reducer']))
    maxvar = int(pcm2cdf['global'].attrs['reducer_maxvar'])
    classif = pcm2cdf['classifier'].attrs['type']
    covariance_type = pcm2cdf['classifier'].attrs['covariance_type']
    backend = pcm2cdf['global'].attrs['backend']

    features = dict()
    for feature in pcm2cdf['global']['feature'].values:
        features[feature] = pcm2cdf[feature]['Z'].values

    loaded_m = models.pcm(K,
                   features=features,
                   scaling=scaling,
                   reduction=reduction, maxvar=maxvar,
                   classif=classif, covariance_type=covariance_type,
                   backend=backend)

    # Fill new instance with fitted method information:
    for feature in loaded_m.features:

        if eval(pcm2cdf['global'].attrs['fitted']):
            # Scaler:
            if pcm2cdf['global'].attrs['scaler'] in ['normal', 'center']:
                loaded_m._scaler[feature].mean_ = pcm2cdf[feature]['scaler_center'].values
                if pcm2cdf['global'].attrs['scaler'] == 'normal':
                    loaded_m._scaler[feature].scale_ = pcm2cdf[feature]['scaler_scale'].values
                else:
                    setattr(loaded_m._scaler[feature], 'scale_', None)

                setattr(loaded_m._scaler[feature], 'fitted', True)
                validation.check_is_fitted(loaded_m._scaler[feature], 'fitted')

            # Reducer:
            if reduction:
                loaded_m._reducer[feature].mean_ = pcm2cdf[feature]['reducer_center'].values
                loaded_m._reducer[feature].components_ = pcm2cdf[feature]['reducer_eigenvector'].values
                setattr(loaded_m._reducer[feature], 'fitted', True)
                validation.check_is_fitted(loaded_m._reducer[feature], 'fitted')

            # Homogeniser:
            loaded_m._homogeniser[feature]['mean'] = pcm2cdf[feature].attrs['homogeniser'][0]
            loaded_m._homogeniser[feature]['std'] = pcm2cdf[feature].attrs['homogeniser'][1]

    # Classifier:
    if eval(pcm2cdf['global'].attrs['fitted']):
        gmm = GaussianMixture(n_components=loaded_m.K,
                              covariance_type=covariance_type,
                              n_init=0, max_iter=1, warm_start=True,
                              weights_init=pcm2cdf['classifier']['prior'].values,
                              means_init=pcm2cdf['classifier']['center'].values,
                              precisions_init=pcm2cdf['classifier']['precision'].values)
        setattr(gmm, 'fitted', True)
        setattr(gmm, 'weights_', pcm2cdf['classifier']['prior'].values)
        setattr(gmm, 'means_', pcm2cdf['classifier']['center'].values)
        setattr(gmm, 'precisions_cholesky_', pcm2cdf['classifier']['precision_cholesky'].values)
        loaded_m._classifier = gmm
        validation.check_is_fitted(gmm, 'fitted')

    # PCM properties
    if eval(pcm2cdf['global'].attrs['fitted']):
        loaded_m._props['llh'] = pcm2cdf['global'].attrs['fit_score']
        setattr(loaded_m, 'fitted', True)

    return loaded_m

def _load_netcdf_format1(ncfile):
    """ Load a PCM model from a netcdf file with format version 1.0

        Model loader for Matlab PCM files

        Parameters
        ----------
        ncfile : str
            File name from which to load a PCM.

    """

    pcm2cdf = dict()
    pcm2cdf['global'] = xr.open_dataset(ncfile, group='/')

    if pcm2cdf['global'].attrs['software'] != "Profile Classification Model - Matlab Toolbox (c) Ifremer":
        raise ValueError("Can't load netcdf not created with appropriate software.\n" +
                         pcm2cdf['global'].attrs['software'])
    if pcm2cdf['global'].attrs['format_version'] != "1.0":
        raise ValueError("Incompatible format version " + pcm2cdf['global'].attrs['format_version'])

    pcm2cdf['scaler'] = xr.open_dataset(ncfile, group='Normalization')
    pcm2cdf['reducer'] = xr.open_dataset(ncfile, group='Reduction')
    pcm2cdf['classifier'] = xr.open_dataset(ncfile, group='ClassificationModel')

    # Create a new pcm instance:
    K = len(pcm2cdf['classifier']['CLASS'])

    scaling = int(pcm2cdf['global'].attrs['PCM_normalization'])

    reduction = False
    if pcm2cdf['global'].attrs['PCM_doREDUCE'] == 'true':
        reduction = True
    #     maxvar = int(pcm2cdf['global'].attrs['reducer_maxvar'])
    maxvar = len(pcm2cdf['reducer']['REDUCED_DIM'])

    classif = 'gmm'
    covariance_type = pcm2cdf['classifier'].attrs['Covariance_matrix_original_form']
    backend = 'sklearn'

    feature = 'unknown'
    features = {feature: pcm2cdf['global']['DEPTH_MODEL']}

    loaded_m = pyxpcm.models.pcm(K,
                                 features=features,
                                 scaling=scaling,
                                 reduction=reduction, maxvar=maxvar,
                                 classif=classif, covariance_type=covariance_type,
                                 backend=backend)

    # Fill new instance with fitted method information:

    # Scaler:
    if scaling in [1, 2]:
        loaded_m._scaler[feature].mean_ = pcm2cdf['scaler']['X_ave'].values
        if scaling == 1:
            loaded_m._scaler[feature].scale_ = pcm2cdf['scaler']['X_std'].values
        else:
            setattr(loaded_m._scaler[feature], 'scale_', None)
        setattr(loaded_m._scaler[feature], 'fitted', True)
        validation.check_is_fitted(loaded_m._scaler[feature], 'fitted')

    # Reducer:
    if reduction:
        loaded_m._reducer[feature].mean_ = pcm2cdf['reducer']['X_ref'].values
        loaded_m._reducer[feature].components_ = pcm2cdf['reducer']['EOFs'].values
        setattr(loaded_m._reducer[feature], 'fitted', True)
        validation.check_is_fitted(loaded_m._reducer[feature], 'fitted')

    # Classifier:
    gmm = GaussianMixture(n_components=loaded_m.K,
                          covariance_type=covariance_type,
                          n_init=0, max_iter=1, warm_start=True,
                          weights_init=pcm2cdf['classifier']['priors'].values,
                          means_init=pcm2cdf['classifier']['centers'].values,
                          precisions_init=np.linalg.inv(pcm2cdf['classifier']['covariances'].values))
    setattr(gmm, 'fitted', True)
    setattr(gmm, 'weights_', pcm2cdf['classifier']['priors'].values)
    setattr(gmm, 'means_', pcm2cdf['classifier']['centers'].values)
    setattr(gmm, 'precisions_cholesky_', np.linalg.cholesky(np.linalg.inv(pcm2cdf['classifier']['covariances'].values)))
    loaded_m._classifier = gmm
    validation.check_is_fitted(gmm, 'fitted')

    # PCM properties
    loaded_m._props['llh'] = pcm2cdf['classifier']['llh'].values
    setattr(loaded_m, 'fitted', True)

    return loaded_m

[docs]def to_netcdf(m, ncfile=None, global_attributes=dict(), mode='w'): """ Save a PCM to a netcdf file Any existing file at this location will be overwritten by default. Time logging information are not saved. Parameters ---------- ncfile : str File name where to save this PCM. global_attributes: dict() Dictionnary of attributes to add to the Netcdf4 file under the global scope. mode : str Writing mode of the file. mode='w' (default) overwrite any existing file. Anything else will raise an Error if file exists. """ if (mode == 'w' and os.path.exists(ncfile) and os.path.isfile(ncfile)): os.remove(ncfile) elif (os.path.exists(ncfile) and os.path.isfile(ncfile)): raise OSError(errno.EEXIST, "File exists. Use mode='w' to overwrite. ", ncfile) # Check if we know how to save predictors: for feature in m.features: scalers_ok = ['StandardScaler', 'NoTransform'] if _TransformerName(m._scaler[feature]) not in scalers_ok: raise TypeError("Export to netcdf is not supported for scaler of type: " + str(type(m._scaler[feature]))) reducers_ok = ['PCA', 'NoTransform'] if _TransformerName(m._reducer[feature]) not in reducers_ok: raise TypeError("Export to netcdf is not supported for reducer of type: " + str(type(m._reducer[feature]))) if not isinstance(m._classifier, GaussianMixture): raise TypeError("Export to netcdf is not supported for classifier of type: " + str(type(m._classifier))) # Create the list of xr.dataset that should go into different scope in the netcdf 4 file pcm2cdf = dict() # Create global scope feature_names = [feature for feature in m.features] ds_global = xr.merge([ xr.DataArray(feature_names, name='feature', dims='F', coords={'F': np.arange(0, m.F)}), xr.DataArray(np.arange(0, m.K), name='class', dims='K', coords={'K': np.arange(0, m.K)}), ]) ds_global.attrs['backend'] = m.backend if hasattr(m, 'fitted'): ds_global.attrs['fitted'] = str(m.fitted) ds_global.attrs['fit_datetime'] = str(m.fitstats['datetime']) ds_global.attrs['fit_score'] = m.fitstats['score'] ds_global.attrs['fit_n_samples_seen'] = m.fitstats['n_samples_seen_'] else: ds_global.attrs['fitted'] = str(False) ds_global.attrs['scaler'] = m._props['with_scaler'] ds_global.attrs['reducer'] = str(m._props['with_reducer']) ds_global.attrs['reducer_maxvar'] = str(m._props['maxvar']) ds_global.attrs['software'] = __software_name__ ds_global.attrs['software_version'] = __version__ ds_global.attrs['format_version'] = __format_version__ # Add user defined additional global attributes: for key in global_attributes: ds_global.attrs[key] = global_attributes[key] pcm2cdf['global'] = ds_global # Create feature scopes: for feature in m.features: if hasattr(m, 'fitted'): if m._props['with_scaler'] == 'normal' or m._props['with_scaler'] == 'center': ds_scaler = xr.DataArray(m._scaler[feature].mean_, name='scaler_center', dims='Z', coords={'Z': m._props['features'][feature]}, attrs={'feature': feature, 'long_name': 'scaler mean', 'unit': m._scaler_props[feature]['units']}).to_dataset() if m._props['with_scaler'] == 'normal': ds_scaler = xr.merge([ds_scaler, xr.DataArray(m._scaler[feature].scale_, name='scaler_scale', dims='Z', coords={'Z': m._props['features'][feature]}, attrs={'feature': feature, 'long_name': 'scaler std', 'unit': m._scaler_props[feature]['units']}) ]) if m._props['with_reducer']: Xeof_mean = m._reducer[feature].mean_ Xeof = m._reducer[feature].components_ ds_reducer = xr.merge([ xr.DataArray(Xeof_mean, name='reducer_center', dims=['Z'], coords={'Z': m._props['features'][feature]}, attrs={'feature': feature, 'long_name': 'PCA center'}), xr.DataArray(Xeof, name='reducer_eigenvector', dims=['REDUCED_DIM', 'Z'], coords={'Z': m._props['features'][feature], 'REDUCED_DIM': np.arange(0, Xeof.shape[0])}, attrs={'feature': feature, 'long_name': 'PCA eigen vectors', 'maxvar': str(m._props['maxvar'])}) ]) if 'none' not in m._props['with_scaler'] and m._props['with_reducer']: ds_feature = xr.merge([ds_scaler, ds_reducer]) elif 'none' in m._props['with_scaler'] and m._props['with_reducer']: ds_feature = ds_reducer elif 'none' not in m._props['with_scaler'] and not m._props['with_reducer']: ds_feature = ds_scaler else: ds_feature = xr.DataArray(m._props['features'][feature], name='Z', dims='N_LEVELS', attrs={'feature': feature, 'long_name': 'Vertical_axis'}).to_dataset() ds_feature.attrs['homogeniser'] = np.array( [m._homogeniser[feature]['mean'], m._homogeniser[feature]['std']]) else: ds_feature = xr.DataArray(m._props['features'][feature], name='Z', dims='N_LEVELS', attrs={'feature': feature, 'long_name': 'Vertical_axis'}).to_dataset() ds_feature.attrs['name'] = feature pcm2cdf[feature] = ds_feature # Create classifier scope: if hasattr(m, 'fitted'): if m._props['with_reducer']: GMM_DIM = np.arange(0, np.sum([len(pcm2cdf[f]['REDUCED_DIM']) for f in m.features])) else: GMM_DIM = np.arange(0, np.sum([len(pcm2cdf[f]['Z']) for f in m.features])) if m._classifier.covariance_type == 'full': covars = m._classifier.covariances_ precisions = m._classifier.precisions_ precisions_cholesky = m._classifier.precisions_cholesky_ elif m._classifier.covariance_type == 'diag': covars = np.zeros((m.K, len(GMM_DIM), len(GMM_DIM))) precisions = np.zeros((m.K, len(GMM_DIM), len(GMM_DIM))) precisions_cholesky = np.zeros((m.K, len(GMM_DIM), len(GMM_DIM))) for ik, k in enumerate(m): covars[k, :, :] = np.diag(m._classifier.covariances_[ik, :]) precisions[k, :, :] = np.diag(m._classifier.precisions_[ik, :]) precisions_cholesky[k, :, :] = np.diag(m._classifier.precisions_cholesky_[ik, :]) elif m._classifier.covariance_type == 'spherical': covars = np.zeros((m.K, len(GMM_DIM), len(GMM_DIM))) precisions = np.zeros((m.K, len(GMM_DIM), len(GMM_DIM))) precisions_cholesky = np.zeros((m.K, len(GMM_DIM), len(GMM_DIM))) for ik, k in enumerate(m): covars[k, :, :] = m._classifier.covariances_[ik] * np.eye((len(GMM_DIM))) precisions[k, :, :] = m._classifier.precisions_[ik] * np.eye((len(GMM_DIM))) precisions_cholesky[k, :, :] = m._classifier.precisions_cholesky_[ik] * np.eye((len(GMM_DIM))) ds_classifier = xr.merge([ xr.DataArray(m._classifier.weights_, name='prior', dims='K', coords={'K': pcm2cdf['global']['K']}, attrs={'long_name': 'Mixture component priors'}), xr.DataArray(m._classifier.means_, name='center', dims=['K', 'GMM_DIM'], coords={'K': pcm2cdf['global']['K'], 'GMM_DIM': GMM_DIM}, attrs={'long_name': 'Mixture component centers'}), xr.DataArray(covars, name='covariance', dims=['K', 'GMM_DIM', 'GMM_DIM'], attrs={'long_name': 'Mixture component covariances', 'initial_shape': m._props['COVARTYPE']}), xr.DataArray(precisions, name='precision', dims=['K', 'GMM_DIM', 'GMM_DIM'], attrs={'long_name': 'Mixture component precisions', 'comment': 'A precision matrix is the inverse of a covariance matrix.' + \ 'Storing the precision matrices instead of the covariance matrices makes ' + \ 'it more efficient to compute the log-likelihood of new samples.', 'initial_shape': m._props['COVARTYPE']}), xr.DataArray(precisions_cholesky, name='precision_cholesky', dims=['K', 'GMM_DIM', 'GMM_DIM'], attrs={'long_name': 'Mixture component Cholesky precisions', 'comment': 'Cholesky decomposition of the precision matrices of each mixture component.', 'initial_shape': m._props['COVARTYPE']}), xr.DataArray(m._xlabel, name='dimension_name', dims=['GMM_DIM'], attrs={'long_name': 'Name of reduced dimensions'}) ]) ds_classifier.attrs['type'] = m._props['with_classifier'] ds_classifier.attrs['covariance_type'] = m._props['COVARTYPE'] else: ds_classifier = xr.Dataset() ds_classifier.attrs['type'] = m._props['with_classifier'] ds_classifier.attrs['covariance_type'] = m._props['COVARTYPE'] pcm2cdf['classifier'] = ds_classifier # Create file pcm2cdf['global'].attrs['creation_date'] = str(datetime.utcnow()) pcm2cdf['global'].to_netcdf(ncfile, mode='w', format='NETCDF4', group='/') for feature in m.features: pcm2cdf[feature].to_netcdf(ncfile, mode='a', format='NETCDF4', group='feature_' + feature) # if hasattr(m, 'fitted'): pcm2cdf['classifier'].to_netcdf(ncfile, mode='a', format='NETCDF4', group='classifier')
[docs]def load_netcdf(ncfile): """ Load a PCM model from netcdf file Parameters ---------- ncfile : str File name from which to load a PCM. """ pcm2cdf = dict() pcm2cdf['global'] = xr.open_dataset(ncfile, group='/') # Check file format: if pcm2cdf['global'].attrs['format_version'] == "1.0": loaded_m = _load_netcdf_format1(ncfile) elif pcm2cdf['global'].attrs['format_version'] != __format_version__: raise ValueError("Incompatible format version " + pcm2cdf['global'].attrs['format_version']) else: loaded_m = _load_netcdf_format2(ncfile) return loaded_m