Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduction of ATOL in finite vectorisation method #348

Merged
merged 20 commits into from
Jul 23, 2020
Merged
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 146 additions & 3 deletions src/python/gudhi/representations/vector_methods.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# This file is part of the Gudhi Library - https://gudhi.inria.fr/ - which is released under MIT.
# See file LICENSE or go to https://gudhi.inria.fr/licensing/ for full license details.
# Author(s): Mathieu Carrière
# Author(s): Mathieu Carrière, Martin Royer
#
# Copyright (C) 2018-2019 Inria
# Copyright (C) 2018-2020 Inria
#
# Modification(s):
# - YYYY/MM Author: Description of the modification
# - 2020/06 Martin: ATOL integration

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler
from sklearn.neighbors import DistanceMetric
from sklearn.metrics import pairwise

from .preprocessing import DiagramScaler, BirthPersistenceTransform

Expand Down Expand Up @@ -574,3 +575,145 @@ def __call__(self, diag):
numpy array with shape (**threshold**): output complex vector of coefficients.
"""
return self.fit_transform([diag])[0,:]

def _lapl_contrast(measure, centers, inertias, eps=1e-8):
"""contrast function for vectorising `measure` in ATOL"""
return np.exp(-np.sqrt(pairwise.pairwise_distances(measure, Y=centers) / (inertias + eps)))

def _gaus_contrast(measure, centers, inertias, eps=1e-8):
"""contrast function for vectorising `measure` in ATOL"""
return np.exp(-pairwise.pairwise_distances(measure, Y=centers) / (inertias + eps))

def _indicator_contrast(diags, centers, inertias, eps=1e-8):
"""contrast function for vectorising `measure` in ATOL"""
pair_dist = pairwise.pairwise_distances(diags, Y=centers)
flat_circ = (pair_dist < (inertias+eps)).astype(int)
robe_curve = np.positive((2-pair_dist/(inertias+eps))*((inertias+eps) < pair_dist).astype(int))
return flat_circ + robe_curve

def _cloud_weighting(measure):
"""automatic uniform weighting with mass 1 for `measure` in ATOL"""
return np.ones(shape=measure.shape[0])

def _iidproba_weighting(measure):
"""automatic uniform weighting with mass 1/N for `measure` in ATOL"""
return np.ones(shape=measure.shape[0]) / measure.shape[0]

class Atol(BaseEstimator, TransformerMixin):
"""
This class allows to vectorise measures (e.g. point clouds, persistence diagrams, etc) after a quantisation step.

ATOL paper: https://arxiv.org/abs/1909.13472

Example
--------
>>> from sklearn.cluster import KMeans
>>> from gudhi.representations.vector_methods import Atol
>>> import numpy as np
>>> a = np.array([[1, 2, 4], [1, 4, 0], [1, 0, 4]])
>>> b = np.array([[4, 2, 0], [4, 4, 0], [4, 0, 2]])
>>> c = np.array([[3, 2, -1], [1, 2, -1]])
>>> atol_vectoriser = Atol(quantiser=KMeans(n_clusters=2, random_state=202006))
>>> atol_vectoriser.fit(X=[a, b, c]).centers
array([[ 2. , 0.66666667, 3.33333333],
[ 2.6 , 2.8 , -0.4 ]])
>>> atol_vectoriser(a)
array([1.0769395 , 0.58394704])
>>> atol_vectoriser(c)
array([0.23559623, 1.02816136])
>>> atol_vectoriser.transform(X=[a, b, c])
array([[1.0769395 , 0.58394704],
[0.56203292, 1.04696684],
[0.23559623, 1.02816136]])
"""
def __init__(self, quantiser, weighting_method="cloud", contrast="gaussian"):
"""
Constructor for the Atol measure vectorisation class.

Parameters:
quantiser (Object): Object with `fit` (sklearn API consistent) and `cluster_centers` and `n_clusters`
attributes. This object will be fitted by the function `fit`.
weighting_method (string): constant generic function for weighting the measure points
choose from {"cloud", "iidproba"}
(default: constant function, i.e. the measure is seen as a point cloud by default).
This will have no impact if weights are provided along with measures all the way: `fit` and `transform`.
contrast (string): constant function for evaluating proximity of a measure with respect to centers
choose from {"gaussian", "laplacian", "indicator"}
(default: gaussian contrast function, see page 3 in the ATOL paper).
"""
self.quantiser = quantiser
self.contrast = {
"gaussian": _gaus_contrast,
"laplacian": _lapl_contrast,
"indicator": _indicator_contrast,
}.get(contrast, _gaus_contrast)
self.centers = np.ones(shape=(self.quantiser.n_clusters, 2))*np.inf
self.inertias = np.full(self.quantiser.n_clusters, np.nan)
self.weighting_method = {
"cloud" : _cloud_weighting,
"iidproba": _iidproba_weighting,
}.get(weighting_method, _cloud_weighting)

def fit(self, X, y=None, sample_weight=None):
"""
Calibration step: fit centers to the sample measures and derive inertias between centers.

Parameters:
X (list N x d numpy arrays): input measures in R^d from which to learn center locations and inertias
(measures can have different N).
y: Ignored, present for API consistency by convention.
sample_weight (list of numpy arrays): weights for each measure point in X, optional.
If None, the object's weighting_method will be used.

Returns:
self
"""
if not hasattr(self.quantiser, 'fit'):
raise TypeError("quantiser %s has no `fit` attribute." % (self.quantiser))
if len(X) < self.quantiser.n_clusters:
# in case there are not enough observations for fitting the quantiser, we add random points in [0, 1]^2
# @Martin: perhaps this behaviour is to be externalised and a warning should be raised instead
random_points = np.random.rand(self.quantiser.n_clusters-len(X), X[0].shape[1])
X.append(random_points)
if sample_weight is None:
sample_weight = np.concatenate([self.weighting_method(measure) for measure in X])

measures_concat = np.concatenate(X)
self.quantiser.fit(X=measures_concat, sample_weight=sample_weight)
self.centers = self.quantiser.cluster_centers_
labels = np.argmin(pairwise.pairwise_distances(measures_concat, Y=self.centers), axis=1)
dist_centers = pairwise.pairwise_distances(self.centers)
np.fill_diagonal(dist_centers, np.inf)
self.inertias = np.min(dist_centers, axis=0)/2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this isn't meant to be used with a single center.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there is no reason why it should not, and it is interesting.

Copy link
Collaborator Author

@martinroyer martinroyer Jun 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dist_centers = pairwise.pairwise_distances(self.centers)
np.fill_diagonal(dist_centers, np.inf)
self.inertias = np.min(dist_centers, axis=0)/2
if self.quantiser.n_clusters == 1:
dist_centers = pairwise.pairwise_distances(measures_concat)
np.fill_diagonal(dist_centers, 0)
self.inertias = np.max(dist_centers)/2
else:
dist_centers = pairwise.pairwise_distances(self.centers)
np.fill_diagonal(dist_centers, np.inf)
self.inertias = np.min(dist_centers, axis=0)/2

return self

def __call__(self, measure, sample_weight=None):
"""
Apply measure vectorisation on a single measure.

Parameters:
measure (n x d numpy array): input measure in R^d.

Returns:
numpy array in R^self.quantiser.n_clusters.
"""
if sample_weight is None:
sample_weight = self.weighting_method(measure)
return np.sum(sample_weight * self.contrast(measure, self.centers, self.inertias.T).T, axis=1)

def transform(self, X, sample_weight=None):
"""
Apply measure vectorisation on a list of measures.

Parameters:
X (list N x d numpy arrays): input measures in R^d from which to learn center locations and inertias
(measures can have different N).
sample_weight (list of numpy arrays): weights for each measure point in X, optional.
If None, the object's weighting_method will be used.

Returns:
numpy array with shape (number of measures) x (self.quantiser.n_clusters).
"""
if sample_weight is None:
sample_weight = [self.weighting_method(measure) for measure in X]
return np.stack([self(measure, sample_weight=weight) for measure, weight in zip(X, sample_weight)])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something that we can consider parallelizing in the future (not in the initial PR), or is it expected that the costly part is fit and it would be useless to try and speed up transform, which is already very fast (much faster than whatever produced the diagrams in the first place)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ATOL does not necessarily apply to diagrams, it could be simpler objects that have near zero production cost.
You're right about parallelizing the transform: our research shows ATOL can be very useful even after a rough calibration, so it's very likely that there are situations where fit will necessarily be quick and dirty, but then there is a lot of transforming to be done.