#!/bin/env python
# -*coding: UTF-8 -*-
#
"""
Provide basic methods for quick and easy plotting of/with PCM features
"""
# Import packages:
from . import models
from .utils import docstring
from contextlib import contextmanager
import warnings
# Import packages in the requirements.txt:
import numpy as np
import pandas as pd
from sklearn.utils import validation
import sklearn
# Import mandatory packages:
try:
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.cm as cm
import matplotlib.colors as mcolors
import matplotlib.ticker as mticker
except:
warnings.warn("pyXpcm requires matplotlib installed for plotting functionality")
try:
import cartopy
import cartopy.crs as ccrs
import cartopy.feature as cfeature
from cartopy.mpl.gridliner import LONGITUDE_FORMATTER, LATITUDE_FORMATTER
except ModuleNotFoundError: # ModuleNotFoundError added in python 3.6
warnings.warn("pyXpcm requires cartopy installed for full plotting functionality")
try:
import seaborn as sns
sns.set_context("paper")
with_seaborn = True
except ModuleNotFoundError:
warnings.warn("pyXpcm requires seaborn installed for full plotting functionality")
with_seaborn = False
# Let's start
@contextmanager
def axes_style(style="white"):
""" Provide a context for plots
The point is to handle the availability of :mod:`seaborn` or not
"""
if with_seaborn: # Execute within a seaborn context:
with sns.axes_style(style):
yield
else: # Otherwise do nothing
yield
def cmap_robustess():
""" Return a categorical colormap for robustness """
return mpl.colors.ListedColormap(['#FF0000', '#CC00FF', '#0066FF', '#CCFF00', '#00FF66'])
def cmap_discretize(name, K):
"""Return a discrete colormap from a quantitative or continuous colormap name
name: name of the colormap, eg 'Paired' or 'jet'
K: number of colors in the final discrete colormap
"""
if name in ['Set1', 'Set2', 'Set3', 'Pastel1', 'Pastel2', 'Paired', 'Dark2', 'Accent']:
# Segmented (or quantitative) colormap:
N_ref = {'Set1':9,'Set2':8,'Set3':12,'Pastel1':9,'Pastel2':8,'Paired':12,'Dark2':8,'Accent':8}
N = N_ref[name]
cmap = plt.get_cmap(name=name)
colors_i = np.concatenate((np.linspace(0, 1., N), (0., 0., 0., 0.)), axis=0)
cmap = cmap(colors_i) # N x 4
n = np.arange(0, N)
new_n = n.copy()
if K > N:
for k in range(N,K):
r = np.roll(n,-k)[0][np.newaxis]
new_n = np.concatenate((new_n, r), axis=0)
new_cmap = cmap.copy()
new_cmap = cmap[new_n,:]
new_cmap = mcolors.LinearSegmentedColormap.from_list(name + "_%d" % K, colors = new_cmap, N=K)
else:
# Continuous colormap:
N = K
cmap = plt.get_cmap(name=name)
colors_i = np.concatenate((np.linspace(0, 1., N), (0., 0., 0., 0.)))
colors_rgba = cmap(colors_i) # N x 4
indices = np.linspace(0, 1., N + 1)
cdict = {}
for ki, key in enumerate(('red', 'green', 'blue')):
cdict[key] = [(indices[i], colors_rgba[i - 1, ki], colors_rgba[i, ki])
for i in np.arange(N + 1)]
# Return colormap object.
new_cmap = mcolors.LinearSegmentedColormap(cmap.name + "_%d" % N, cdict, N)
return new_cmap
def colorbar_index(ncolors, name, **kwargs):
"""Adjust colorbar ticks with discrete colors"""
cmap = cmap_discretize(name, ncolors)
mappable = cm.ScalarMappable(cmap=cmap)
mappable.set_array([])
mappable.set_clim(-0.5, ncolors+0.5)
colorbar = plt.colorbar(mappable, **kwargs)
colorbar.set_ticks(np.linspace(0, ncolors, ncolors))
colorbar.set_ticklabels(range(ncolors))
return colorbar
[docs]def latlongrid(ax, dx=5., dy=5., fontsize=6, **kwargs):
""" Add latitude/longitude grid line and labels to a cartopy geoaxes """
if not isinstance(ax, cartopy.mpl.geoaxes.GeoAxesSubplot):
raise ValueError("Please provide a cartopy.mpl.geoaxes.GeoAxesSubplot instance")
defaults = {'linewidth':.5, 'color':'gray', 'alpha':0.5, 'linestyle':'--'}
gl=ax.gridlines(crs=ax.projection, draw_labels=True, **{**defaults, **kwargs})
gl.xlocator = mticker.FixedLocator(np.arange(-180, 180+1, dx))
gl.ylocator = mticker.FixedLocator(np.arange(-90, 90+1, dy))
gl.xformatter = LONGITUDE_FORMATTER
gl.yformatter = LATITUDE_FORMATTER
gl.xlabels_top = False
gl.xlabel_style = {'fontsize':fontsize}
gl.ylabels_right = False
gl.ylabel_style = {'fontsize':fontsize}
return gl
[docs]def cmap(m, name, palette=False, usage='class'):
"""Return categorical colormaps
Parameters
----------
name : str
Name of the colormap, ex: 'Set2'
palette : bool
Whether to return a Seaborn color palette or not.
- False (default): function returns a :class:``matplotlib.colors.LinearSegmentedColormap``
- True: function returns a :func:``seaborn.color_palette``
usage : str
The intended usage of the colormap, this can be:
- 'class' (default): one color per class
- 'robustness' : a 5-colors for probability ranges
Returns
-------
:class:``matplotlib.colors.LinearSegmentedColormap`` or :func:``seaborn.color_palette``
"""
if usage == 'class':
if not palette:
c = cmap_discretize(name, m.K)
elif with_seaborn:
c = sns.color_palette(name, m.K)
else:
raise ValueError("Rquire seaborn install for palette=True")
elif usage == 'robustness':
c = mpl.colors.ListedColormap(['#FF0000', '#CC00FF', '#0066FF', '#CCFF00', '#00FF66'])
else:
raise ValueError("Unknown 'usage' value (%s) " % usage)
return c
[docs]def colorbar(m, cmap=None, **kwargs):
"""Add a colorbar to the current plot with centered ticks on discrete colors"""
if cmap==None:
if m._props['cmap']==None:
cmap = m.plot.cmap()
else:
cmap = m._props['cmap']
z = { **{'fraction':0.03, 'label':'Class'}, **kwargs}
return colorbar_index(ncolors=m.K, cmap=cmap, **z)
[docs]def subplots(m, maxcols=3, K=np.Inf, subplot_kw=None, **kwargs):
""" Return (figure, axis) with one subplot per cluster
Parameters
----------
:class:`pyxpcm.models.pcm`
A PCM instance
maxcols : int
Maximum number of columns to use
K : int
The number of subplot required (:func:`pyxpcm.models.pcm.K` by default)
subplot_kw : dict()
Arguments to be submitted to the :class:`matplotlib.pyplot.subplots` subplot_kw options.
All other **kwargs are forwarded to :class:`matplotlib.pyplot.subplots`
Returns
-------
fig : :class:`matplotlib.pyplot.figure.Figure`
ax : :class:`matplotlib.axes.Axes` object or array of Axes objects.
*ax* can be either a single :class:`matplotlib.axes.Axes` object or an
array of Axes objects if more than one subplot was created. The
dimensions of the resulting array can be controlled with the squeeze
keyword, see above.
Examples
--------
fig, ax = m.plot.subplots(maxcols=4, sharey=True, figsize=(12,6))
__author__: gmaze@ifremer.fr
"""
nrows = 1
if K == np.Inf:
K = m.K
ncols = K
if K > maxcols:
nrows = 1 + np.int(K / maxcols)
ncols = maxcols
if ncols == 1:
nrows = K
if not subplot_kw:
fig, ax = plt.subplots(nrows=nrows, ncols=ncols, **kwargs)
else:
fig, ax = plt.subplots(nrows=nrows, ncols=ncols, subplot_kw=subplot_kw, **kwargs)
ax = np.array(ax).flatten()
for i in range(K, nrows * ncols):
fig.delaxes(ax[i])
return fig, ax
[docs]def timeit(m, group='Method', split='Sub-method', subplot_kw=None, style='white', unit='ms', **kwargs):
""" Plot PCM registered timing of operations
Parameters
----------
group='Method',
split='Sub-method',
subplot_kw=None, style='white'
unit='s'
Returns
-------
fig, ax, df
"""
# Read timings:
df = m.timeit
# Default timeit unit is milli-seconds
if unit == 's':
df = df/1000.
elif unit == 'm':
df = df/1000./60.
elif unit == 'h':
df = df/1000./60./60.
# Get max levels:
dpt = list()
[dpt.append(len(key.split("."))) for key in m._timeit]
max_dpt = np.max(dpt)
with axes_style(style):
defaults = {'figsize': (5, 3), 'dpi': 90}
if not subplot_kw:
fig, ax = plt.subplots(**{**defaults, **kwargs})
else:
fig, ax = plt.subplots(**{**defaults, **kwargs}, subplot_kw=subplot_kw)
if max_dpt == 1: # 1 Level:
df.plot(kind='barh', ax=ax)
# ylabel = 'Method'
if max_dpt == 2: # 2 Levels:
# df = df.T
df.plot(kind='barh', stacked=1, legend=1, subplots=0, ax=ax)
# ylabel = 'Method'
if max_dpt > 2:
# Select 2 dimensions to plot:
df = df.groupby([group, split]).sum()
df = df.unstack(0)
if 'total' in df.index:
df.drop('total', inplace=True)
if 'total' in df.keys():
df.drop('total', axis=1, inplace=True)
if '' in df.index:
df.drop('', inplace=True)
df = df.T
df = df[df.sum(axis=1)!=0]
df.plot(kind='barh', stacked=1, legend=0, subplots=0, ax=ax)
plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5))
if with_seaborn: sns.despine()
ax.grid(True)
ax.set_xlabel( "Time [%s]" % unit)
ax.set_ylabel(group)
return fig, ax, df
[docs]def preprocessed(m, ds, features=None, dim=None, n=1000, kde=False, style='darkgrid', **kargs):
""" Plot preprocessed features as pairwise scatter plots
Require :mod:`seaborn`
Parameters
----------
:class:`pyxpcm.pcm` instance
ds: :class:`xarray.Dataset`
The dataset to work with
features: dict()
Definitions of PCM features in the input :class:`xarray.Dataset`.
If not specified or set to None, features are identified using :class:`xarray.DataArray` attributes 'feature_name'.
n : int
Number of samples to use in scatter plots
Returns
-------
g : :class:`seaborn.axisgrid.PairGrid`
Seaborn Pairgrid instance
__author__: gmaze@ifremer.fr
"""
if not with_seaborn:
raise ValueError("Seaborn is required for this function")
# Get preprocessed features (the [n_samples, n_features] numpy array seen by the classifier)
X, sampling_dims = m.preprocessing(ds, features=features, dim=dim)
# Create a dataframe for seaborn plotting machinery:
df = X.to_dataframe('features').unstack(0)
df.loc['labels'] = m._classifier.predict(X)
df = df.T
# Seaborn PairGrid plot:
random_rows = np.random.choice(range(X.shape[0]), np.min((n, X.shape[0])), replace=False)
with sns.axes_style(style):
defaults = {'height':2.5, 'aspect':1, 'hue':'labels', 'palette': m.plot.cmap(palette=True),
'vars':m._xlabel, 'despine':False}
g = sns.PairGrid(df.iloc[random_rows], **{**defaults, **kargs})
if not kde:
# g = g.map_offdiag(plt.scatter, s=3)
g = g.map_upper(plt.scatter, s=3)
g = g.map_diag(plt.hist, edgecolor=None, alpha=0.75)
else:
g = g.map_upper(plt.scatter, s=3)
g = g.map_lower(sns.kdeplot, linewidths=1)
g = g.map_diag(sns.kdeplot, lw=2, legend=False)
g = g.add_legend()
return g
[docs]def scaler(m, style="whitegrid", plot_kw=None, subplot_kw=None, **kwargs):
"""Plot PCM scalers properties
Parameters
----------
:class:`pyxpcm.pcm` instance
"""
# Check if the PCM is trained:
validation.check_is_fitted(m, 'fitted')
# Plot
with axes_style(style):
defaults = {'sharey':'row', 'figsize':(10, 5*m.F), 'dpi':80, 'facecolor':'w', 'edgecolor':'k'}
if not subplot_kw:
fig, ax = plt.subplots(ncols=2, nrows=m.F, **{**defaults, **kwargs})
else:
fig, ax = plt.subplots(ncols=2, nrows=m.F, **{**defaults, **kwargs}, subplot_kw=subplot_kw)
if m.F == 1:
ax = ax[np.newaxis, :]
for (feature, irow) in zip(m._props['features'], np.arange(0, m.F)):
X_ave = m._scaler[feature].mean_
X_std = m._scaler[feature].scale_
X_unit = m._scaler_props[feature]['units']
feature_axis = m._props['features'][feature]
feature_name = [feature]
# Is this a thick array or a slice ?
is_slice = np.all(m._props['features'][feature] == None)
if not is_slice:
defaults_mean = {'linewidth': 2, 'label': 'Sample Mean'}
defaults_std = {'linewidth': 2, 'label': 'Sample Std'}
if not plot_kw:
ax[irow, 0].plot(X_ave, feature_axis, **defaults_mean)
ax[irow, 1].plot(X_std, feature_axis, **defaults_std)
else:
ax[irow, 0].plot(X_ave, feature_axis, **{**defaults_mean, **plot_kw})
ax[irow, 1].plot(X_std, feature_axis, **{**defaults_std, **plot_kw})
# tidy up the figure
ax[irow, 0].set_ylabel('Vertical feature axis')
for ix in range(0, 2):
ax[irow, ix].legend(loc='lower right')
ax[irow, ix].set_xlabel("[%s]" % X_unit)
ax[irow, ix].set_title("%s scaler" % feature, fontsize=10)
else:
ax[irow, 0].set_title("%s scaler mean=%f" % (feature, X_ave), fontsize=10)
ax[irow, 1].set_title("%s scaler std=%f" % (feature, X_std), fontsize=10)
return fig, ax
[docs]def reducer(m, pcalist=None, style="whitegrid", maxcols=np.Inf, plot_kw=None, subplot_kw=None, **kwargs):
""" Plot PCM reducers properties """
# Check if the PCM is trained:
validation.check_is_fitted(m, 'fitted')
# Plot
with axes_style(style):
defaults = {'sharey': 'row', 'figsize': (5*m.F, 5), 'dpi': 80, 'facecolor': 'w', 'edgecolor': 'k'}
if not subplot_kw:
if maxcols == np.Inf:
fig, ax = m.plot.subplots(K=m.F, maxcols=m.F, **{**defaults, **kwargs})
else:
fig, ax = m.plot.subplots(K=m.F, maxcols=maxcols, **{**defaults, **kwargs})
else:
if maxcols == np.Inf:
fig, ax = m.plot.subplots(K=m.F, maxcols=m.F, **{**defaults, **kwargs}, subplot_kw=subplot_kw)
else:
fig, ax = m.plot.subplots(K=m.F, maxcols=maxcols, **{**defaults, **kwargs}, subplot_kw=subplot_kw)
for (feature, icol) in zip(m._props['features'], np.arange(0, m.F)):
ax[icol].set_title(feature, fontsize=10)
if isinstance(m._reducer[feature], sklearn.decomposition.PCA):
X_eof = m._reducer[feature].components_
if pcalist is None:
pcalist = range(0, X_eof.shape[0])
if np.max(pcalist) > X_eof.shape[0]:
raise ValueError("PCA number %i is not available in reduced %s" % (np.max(pcalist),feature))
feature_axis = m._props['features'][feature]
feature_axis_name = 'Vertical feature axis'
feature_name = [feature]
for ic in pcalist:
defaults = {'linewidth': 1, 'label': 'EOF #%i' % ic}
if not plot_kw:
ax[icol].plot(X_eof[ic, :], feature_axis, **defaults)
else:
ax[icol].plot(X_eof[ic, :], feature_axis, **{**defaults, **plot_kw})
# tidy up the figure
ax[icol].axvline(x=0, color='k')
ax[icol].legend(loc='lower right')
if icol == 0:
ax[icol].set_ylabel(feature_axis_name)
elif isinstance(m._reducer[feature], models.NoTransform):
ax[icol].set_title('No reducer for %s' % feature, fontsize=10)
else:
ax[icol].set_title('Unknown reducer for %s !' % feature, fontsize=10)
return fig, ax
[docs]def quantile(m, da, xlim=None,
classdimname='pcm_class',
quantdimname = 'quantile',
maxcols=3, cmap=None,
ylabel='feature dimension',
**kwargs):
"""Plot q-th quantiles of a dataArray for each PCM components
Parameters
----------
m : :class:`pyxpcm.pcm` instance
da: :class:`xarray.DataArray` with quantiles
xlim
classdimname
quantdimname
maxcols
Returns
-------
fig : :class:`matplotlib.pyplot.figure.Figure`
ax : :class:`matplotlib.axes.Axes` object or array of Axes objects.
*ax* can be either a single :class:`matplotlib.axes.Axes` object or an
array of Axes objects if more than one subplot was created. The
dimensions of the resulting array can be controlled with the squeeze
keyword.
"""
# Check if the PCM is trained:
validation.check_is_fitted(m, 'fitted')
# da must be 3D with a dimension for: CLASS, QUANTILES and a vertical axis
# The QUANTILES dimension is called "quantile"
# The CLASS dimension is identified as the one matching m.K length.
if classdimname in da.dims:
CLASS_DIM = classdimname
elif (np.argwhere(np.array(da.shape) == m.K).shape[0] > 1):
raise ValueError("Can't distinguish the class dimension from the others")
else:
CLASS_DIM = da.dims[np.argwhere(np.array(da.shape) == m.K)[0][0]]
QUANT_DIM = quantdimname
VERTICAL_DIM = list(set(da.dims) - set([CLASS_DIM]) - set([QUANT_DIM]))[0]
nQ = len(da[QUANT_DIM]) # Nb of quantiles
cmapK = m.plot.cmap() # cmap_discretize(plt.cm.get_cmap(name='Paired'), m.K)
if not cmap:
cmap = cmap_discretize(plt.cm.get_cmap(name='brg'), nQ)
defaults = {'figsize':(10, 8), 'dpi':80, 'facecolor':'w', 'edgecolor':'k'}
fig, ax = m.plot.subplots(maxcols=maxcols, **{**defaults, **kwargs})
if not xlim:
xlim = np.array([0.9 * da.min(), 1.1 * da.max()])
for k in m:
Qk = da.loc[{CLASS_DIM:k}]
for (iq, q) in zip(np.arange(nQ), Qk[QUANT_DIM]):
Qkq = Qk.loc[{QUANT_DIM:q}]
ax[k].plot(Qkq.values.T, da[VERTICAL_DIM], label=("%0.2f") % (Qkq[QUANT_DIM]), color=cmap(iq))
ax[k].set_title(("Component: %i") % (k), color=cmapK(k))
ax[k].legend(loc='lower right')
ax[k].set_xlim(xlim)
ax[k].set_ylim(np.array([da[VERTICAL_DIM].min(), da[VERTICAL_DIM].max()]))
# ax[k].set_xlabel(Q.units)
if k == 0: ax[k].set_ylabel(ylabel)
ax[k].grid(True)
plt.tight_layout()
return fig, ax
class _PlotMethods(object):
"""
Enables use of pyxpcm.plot functions as attributes on a PCM object.
For example: m.plot(), m.plot.scaler(), m.plot.cmap('Set2'), m.plot.colorbar()
"""
def __init__(self, m):
self._pcm = m
self._kmap = 'Set1'
def __call__(self, **kwargs):
raise ValueError("plot cannot be called directly. Use one of the plotting methods: cmap, colorbar, subplots, scaler, reducer, timeit, preprocessed")
def cmap(self, **kwargs):
"""Return a categorical colormap for this PCM"""
defaults = {'name': self._kmap}
opts = {**defaults, **kwargs}
c = cmap(self._pcm, **opts)
if 'usage' in opts and opts['usage']=='class':
self._kmap = opts['name']
return c
@docstring(colorbar.__doc__)
def colorbar(self, **kwargs):
if self._kmap:
defaults = {'name': self._kmap}
opts = {**defaults, **kwargs}
else:
opts = kwargs
return colorbar(self._pcm, **opts)
@docstring(subplots.__doc__)
def subplots(self, **kwargs):
return subplots(self._pcm, **kwargs)
@docstring(latlongrid.__doc__)
def latlongrid(self, ax, **kwargs):
return latlongrid(ax, **kwargs)
@docstring(scaler.__doc__)
def scaler(self, **kwargs):
return scaler(self._pcm, **kwargs)
@docstring(reducer.__doc__)
def reducer(self, **kwargs):
return reducer(self._pcm, **kwargs)
@docstring(timeit.__doc__)
def timeit(self, **kwargs):
return timeit(self._pcm, **kwargs)
@docstring(preprocessed.__doc__)
def preprocessed(self, ds, **kwargs):
return preprocessed(self._pcm, ds, **kwargs)
@docstring(quantile.__doc__)
def quantile(self, da, **kwargs):
return quantile(self._pcm, da, **kwargs)