Source code for pyxpcm.xarray

#!/bin/env python
# -*coding: UTF-8 -*-
"""
Provide accessor to enhance interoperability between :mod:`xarray` and :mod:`pyxpcm`.

Provide a scope named ``pyxpcm`` as accessor to :class:`xarray.Dataset` objects.

"""
# Created by gmaze on 2019-10-02

# Note that because of https://github.com/pydata/xarray/issues/3268, nothing happens in place here. So it should always goes like (even with `inplace=True`):
#
#     ds = ds.pyxpcm.<method>()

import os
import sys
import warnings
import numpy as np
import xarray as xr
import dask
from .models import pcm, PCMFeatureError
from . import stat
# from .utils import docstring

# Decorators
def pcm_method(func):
    #todo Decorator that directly map PCM functions on xarray accessor
    func.__doc__ = getattr(pcm, func.__name__).__doc__
    return func

def pcm_stat_method(func):
    #todo Decorator that directly map PCM functions on xarray accessor
    func.__doc__ = getattr(stat, func.__name__).__doc__
    return func

[docs]@xr.register_dataset_accessor('pyxpcm') class pyXpcmDataSetAccessor: """ Class registered under scope ``pyxpcm`` to access :class:`xarray.Dataset` objects. """ def __init__(self, xarray_obj): self._obj = xarray_obj self._added = list() # Will record all new variables added by pyXpcm self._dims = list(xarray_obj.dims.keys()) # Store the initial list of dimensions def __id_feature_name(self, this_pcm, feature): """Identify the dataset variable name to be used for a given feature name feature must be a dictionary or None for automatic discovery """ feature_name_found = False for feature_in_pcm in feature: if feature_in_pcm not in this_pcm._props['features']: msg = ("Feature '%s' not set in this PCM")%(feature_in_pcm) raise PCMFeatureError(msg) feature_in_ds = feature[feature_in_pcm] if feature_in_ds: feature_name_found = feature_in_ds in self._obj.data_vars if not feature_name_found: feature_name_found = feature_in_pcm in self._obj.data_vars feature_in_ds = feature_in_pcm if not feature_name_found: # Look for the feature in the dataset data variables attributes for v in self._obj.data_vars: if ('feature_name' in self._obj[v].attrs) and (self._obj[v].attrs['feature_name'] is feature_in_pcm): feature_in_ds = v feature_name_found = True continue if not feature_name_found: msg = ("Feature '%s' not found in this dataset. You may want to add the 'feature_name' " "attribute to the variable you'd like to use or provide a dictionnary")%(feature_in_pcm) raise PCMFeatureError(msg) elif this_pcm._debug: print(("\tIdying '%s' as '%s' in this dataset") % (feature_in_pcm, feature_in_ds)) return feature_in_ds
[docs] def add(self, da): """Add a :class:`xarray.DataArray` to this :class:`xarray.Dataset`""" if da.name in self._obj.data_vars: warnings.warn(("%s variable already in the dataset: overwriting") % (da.name)) # Add pyXpcm tracking clue to this DataArray: da.attrs['_pyXpcm_cleanable'] = True # Add it to the DataSet: self._obj[da.name] = da # Update internal list of added variables: self._added.append(da.name) return self._obj
[docs] def drop_all(self): """ Remove :class:`xarray.DataArray` created with pyXpcm front this :class:`xarray.Dataset`""" def drop_dims(this_ds, dims): return this_ds.drop_dims([k for k in this_ds.dims if k not in dims]) ds = drop_dims(self._obj, self._dims) return ds.drop([ k for k, v in ds.variables.items() if v.attrs.get("_pyXpcm_cleanable") ])
[docs] def split(self): """Split pyXpcm variables from the original :class:`xarray.Dataset` Returns ------- :class:`xarray.Dataset`, :class:`xarray.Dataset` Two DataSest: one with pyXpcm variables, one with the original DataSet """ _ds = self._obj[[ k for k, v in self._obj.variables.items() if v.attrs.get("_pyXpcm_cleanable") ]].copy(deep=True) return _ds, self.drop_all()
[docs] def feature_dict(self, this_pcm, features=None): """ Return dictionary of features for this :class:`xarray.Dataset` and a PCM Parameters ---------- pcm : :class:`pyxpcm.pcmmodel.pcm` features : dict Keys are PCM feature name, Values are corresponding :class:`xarray.Dataset` variable names Returns ------- dict() Dictionary where keys are PCM feature names and values the corresponding :class:`xarray.Dataset` variables """ features_dict = dict() for feature_in_pcm in this_pcm._props['features']: if features == None: feature_in_ds = self.__id_feature_name(this_pcm, {feature_in_pcm: None}) elif feature_in_pcm not in features: raise PCMFeatureError("%s feature not defined" % feature_in_pcm) else: feature_in_ds = features[feature_in_pcm] if feature_in_ds not in self._obj.data_vars: raise PCMFeatureError("Feature %s not in this dataset as %s" % (feature_in_pcm, feature_in_ds)) features_dict[feature_in_pcm] = feature_in_ds # if features: # features_dict = dict() # for feature_in_pcm in features: # feature_in_ds = features[feature_in_pcm] # if not feature_in_ds: # feature_in_ds = self.__id_feature_name(pcm, {feature_in_pcm: None}) # features_dict[feature_in_pcm] = feature_in_ds # else: # features_dict = dict() # for feature_in_pcm in pcm._props['features']: # feature_in_ds = self.__id_feature_name(pcm, {feature_in_pcm: None}) # features_dict[feature_in_pcm] = feature_in_ds # Re-order the dictionary to match the PCM set order: for key in this_pcm._props['features']: features_dict[key] = features_dict.pop(key) return features_dict
[docs] def sampling_dim(self, this_pcm, features=None, dim=None): """ Return the list of dimensions to be stacked for sampling Parameters ---------- pcm : :class:`pyxpcm.pcm` features : None (default) or dict() Keys are PCM feature name, Values are corresponding :class:`xarray.Dataset` variable names. It set to None, all PCM features are used. dim : None (default) or str() The :class:`xarray.Dataset` dimension to use as vertical axis in all features. If set to None, it is automatically set to the dimension with an attribute ``axis`` set to ``Z``. Returns ------- dict() Dictionary where keys are :class:`xarray.Dataset` variable names of features and values are another dictionary with the list of sampling dimension in DIM_SAMPLING key and the name of the vertical axis in the DIM_VERTICAL key. """ feature_dict = self.feature_dict(this_pcm, features=features) SD = dict() for feature_name_in_pcm in feature_dict: feature_name_in_ds = feature_dict[feature_name_in_pcm] da = self._obj[feature_name_in_ds] SD[feature_name_in_ds] = dict() # Is this a thick array or a slice ? is_slice = np.all(this_pcm._props['features'][feature_name_in_pcm] is None) if is_slice: # No vertical dimension to use, simple stacking sampling_dims = list(da.dims) SD[feature_name_in_ds]['DIM_SAMPLING'] = sampling_dims SD[feature_name_in_ds]['DIM_VERTICAL'] = None else: if not dim: # Try to infer the vertical dimension name looking for the CF 'axis' attribute in all dimensions of the array dim_found = False for this_dim in da.dims: if ('axis' in da[this_dim].attrs) and (da[this_dim].attrs['axis'] == 'Z'): dim = this_dim dim_found = True if not dim_found: raise PCMFeatureError("You must specify a vertical dimension name: "\ "use argument 'dim' or "\ "specify DataSet dimension the attribute 'axis' to 'Z' (CF1.6)") elif dim not in da.dims: raise ValueError("Vertical dimension %s not found in this DataArray" % dim) sampling_dims = list(da.dims) sampling_dims.remove(dim) SD[feature_name_in_ds]['DIM_SAMPLING'] = sampling_dims SD[feature_name_in_ds]['DIM_VERTICAL'] = dim return SD
[docs] def mask(self, this_pcm, features=None, dim=None): """ Create a mask where all PCM features are defined Create a mask where all feature profiles are not null over the PCM feature axis. Parameters ---------- :class:`pyxpcm.pcmmodel.pcm` features : dict() Definitions of this_pcm features in the :class:`xarray.Dataset`. If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'. dim : str Name of the vertical dimension in the :class:`xarray.Dataset`. If not specified or set to None, dim is identified as the :class:`xarray.DataArray` variables with attributes 'axis' set to 'z'. Returns ------- :class:`xarray.DataArray` """ feature_dict = self.feature_dict(this_pcm, features=features) SD = self.sampling_dim(this_pcm, dim=dim, features=features) M = list() for feature_name_in_this_pcm in feature_dict: feature_name_in_ds = feature_dict[feature_name_in_this_pcm] da = self._obj[feature_name_in_ds] # Is this a thick array or a slice ? is_slice = np.all(this_pcm._props['features'][feature_name_in_this_pcm] == None) if not is_slice: dim = SD[feature_name_in_ds]['DIM_VERTICAL'] z_top = np.max(this_pcm._props['features'][feature_name_in_this_pcm]) z_bto = np.min(this_pcm._props['features'][feature_name_in_this_pcm]) # Nz = len((self._obj[dim].where(self._obj[dim] >= z_bto, drop=True)\ # .where(self._obj[dim] <= z_top, drop=True)).notnull()) z = self._obj[dim] z_ok = np.ma.masked_inside(z, z_bto, z_top, copy=True).mask Nz = np.count_nonzero(z_ok == True) mask = self._obj[feature_name_in_ds][{dim:z_ok}].notnull().sum(dim=dim) == Nz # mask = self._obj[feature_name_in_ds]\ # .where(z >= z_bto)\ # .where(z <= z_top).notnull().sum(dim=dim) == Nz # mask = self._obj[feature_name_in_ds].where( # self._obj[dim]>=z_bto).notnull().sum(dim=dim) == len(np.where(self._obj[dim]>=z_bto)[0]) else: mask = self._obj[feature_name_in_ds].notnull() mask = mask.rename('pcm_MASK') M.append(mask) mask = xr.concat(M, dim='n_features') mask = mask.sum(dim='n_features') mask = mask == this_pcm.F return mask
[docs] @pcm_method def fit(self, this_pcm, **kwargs): this_pcm.fit(self._obj, **kwargs)
[docs] @pcm_method def predict(self, this_pcm, inplace=False, **kwargs): """ Map this :class:`xarray.Dataset` on :func:`pyxpcm.pcm.predict` """ da = this_pcm.predict(self._obj, **kwargs) if inplace: return self.add(da) else: return da
[docs] @pcm_method def fit_predict(self, this_pcm, **kwargs): """ Map this :class:`xarray.Dataset` on :func:`pyxpcm.pcm.fit_predict` """ return this_pcm.fit_predict(self._obj, **kwargs)
[docs] @pcm_method def predict_proba(self, this_pcm, **kwargs): """ Map this :class:`xarray.Dataset` on :func:`pyxpcm.pcm.predict_proba` """ return this_pcm.predict_proba(self._obj, **kwargs)
[docs] @pcm_method def score(self, this_pcm, **kwargs): """ Map this :class:`xarray.Dataset` on :func:`pyxpcm.pcm.score` """ return this_pcm.score(self._obj, **kwargs)
[docs] @pcm_method def bic(self, this_pcm, **kwargs): """ Map this :class:`xarray.Dataset` on :func:`pyxpcm.pcm.bic` """ return this_pcm.bic(self._obj, **kwargs)
[docs] @pcm_stat_method def quantile(self, this_pcm, inplace=False, **kwargs): """ Map this :class:`xarray.Dataset` on :meth:`pyxpcm.pcm.stat.quantile` """ da = this_pcm.stat.quantile(self._obj, **kwargs) if inplace: return self.add(da) else: return da
[docs] @pcm_stat_method def robustness(self, this_pcm, inplace=False, **kwargs): """ Map this :class:`xarray.Dataset` on :meth:`pyxpcm.pcm.stat.robustness` """ da = this_pcm.stat.robustness(self._obj, **kwargs) if inplace: return self.add(da) else: return da
[docs] @pcm_stat_method def robustness_digit(self, this_pcm, inplace=False, **kwargs): """ Map this :class:`xarray.Dataset` on :meth:`pyxpcm.pcm.stat.robustness_digit` """ da = this_pcm.stat.robustness_digit(self._obj, **kwargs) if inplace: return self.add(da) else: return da