From 17d8350a20cfb79182f166a4027306f338843883 Mon Sep 17 00:00:00 2001 From: e-dorigatti Date: Thu, 19 Dec 2024 11:21:44 +0100 Subject: [PATCH] custom hamming kernel enabling single task gp on categorical features --- bofire/data_models/kernels/categorical.py | 3 +- bofire/kernels/categorical.py | 25 ++++ bofire/kernels/mapper.py | 35 +++++- scratch.py | 132 ++++++++++++++++++++++ tests/bofire/surrogates/test_gps.py | 55 +++++++++ 5 files changed, 243 insertions(+), 7 deletions(-) create mode 100644 bofire/kernels/categorical.py create mode 100644 scratch.py diff --git a/bofire/data_models/kernels/categorical.py b/bofire/data_models/kernels/categorical.py index 4fa2e0d72..8d03c429d 100644 --- a/bofire/data_models/kernels/categorical.py +++ b/bofire/data_models/kernels/categorical.py @@ -1,4 +1,4 @@ -from typing import Literal +from typing import Literal, Optional from bofire.data_models.kernels.kernel import ConcreteKernel @@ -10,3 +10,4 @@ class CategoricalKernel(ConcreteKernel): class HammingDistanceKernel(CategoricalKernel): type: Literal["HammingDistanceKernel"] = "HammingDistanceKernel" ard: bool = True + with_one_hots: Optional[bool] = None diff --git a/bofire/kernels/categorical.py b/bofire/kernels/categorical.py new file mode 100644 index 000000000..7e04065de --- /dev/null +++ b/bofire/kernels/categorical.py @@ -0,0 +1,25 @@ +import torch +from gpytorch.kernels.kernel import Kernel +from torch import Tensor + + +class HammingKernelWithOneHots(Kernel): + has_lengthscale = True + + def forward( + self, + x1: Tensor, + x2: Tensor, + diag: bool = False, + last_dim_is_batch: bool = False, + ) -> Tensor: + delta = (x1.unsqueeze(-2) - x2.unsqueeze(-3))**2 + dists = delta / self.lengthscale.unsqueeze(-2) + if last_dim_is_batch: + dists = dists.transpose(-3, -1) + + dists = dists.sum(-1) / 2 + res = torch.exp(-dists) + if diag: + res = torch.diagonal(res, dim1=-1, dim2=-2) + return res diff --git a/bofire/kernels/mapper.py b/bofire/kernels/mapper.py index f05baf790..7d860963e 100644 --- a/bofire/kernels/mapper.py +++ b/bofire/kernels/mapper.py @@ -7,6 +7,7 @@ import bofire.data_models.kernels.api as data_models import bofire.priors.api as priors +from bofire.kernels.categorical import HammingKernelWithOneHots from bofire.kernels.fingerprint_kernels.tanimoto_kernel import TanimotoKernel from bofire.kernels.shape import WassersteinKernel @@ -215,13 +216,35 @@ def map_HammingDistanceKernel( ard_num_dims: int, active_dims: List[int], features_to_idx_mapper: Optional[Callable[[List[str]], List[int]]], -) -> CategoricalKernel: +) -> GpytorchKernel: active_dims = _compute_active_dims(data_model, active_dims, features_to_idx_mapper) - return CategoricalKernel( - batch_shape=batch_shape, - ard_num_dims=len(active_dims) if data_model.ard else None, - active_dims=active_dims, # type: ignore - ) + + if data_model.with_one_hots is None: + with_one_hots = data_model.features is not None and len(active_dims) > 1 + else: + with_one_hots = data_model.with_one_hots + + if with_one_hots and len(active_dims) == 1: + raise RuntimeError( + "only one feature for categorical kernel operating on one-hot features" + ) + elif not with_one_hots and len(active_dims) > 1: + # this is not necessarily an issue since botorch's CategoricalKernel + # can work on multiple features at the same time + pass + + if with_one_hots: + return HammingKernelWithOneHots( + batch_shape=batch_shape, + ard_num_dims=len(active_dims) if data_model.ard else None, + active_dims=active_dims, # type: ignore + ) + else: + return CategoricalKernel( + batch_shape=batch_shape, + ard_num_dims=len(active_dims) if data_model.ard else None, + active_dims=active_dims, # type: ignore + ) def map_WassersteinKernel( diff --git a/scratch.py b/scratch.py new file mode 100644 index 000000000..15caab666 --- /dev/null +++ b/scratch.py @@ -0,0 +1,132 @@ +import pandas as pd + +import bofire.strategies.api as strategies +import bofire.surrogates.api as surrogates +from bofire.data_models.domain import api as domain_api +from bofire.data_models.features import api as features_api +from bofire.data_models.kernels import api as kernels_api +from bofire.data_models.molfeatures import api as molfeatures_api +from bofire.data_models.priors.api import HVARFNER_LENGTHSCALE_PRIOR +from bofire.data_models.strategies import api as strategies_api +from bofire.data_models.surrogates import api as surrogates_api + + +def test_SingleTaskGPModel_mixed_features(): + """test that we can use a single task gp with mixed features""" + inputs = domain_api.Inputs( + features=[ + features_api.ContinuousInput( + key=f"x_{i+1}", + bounds=(-4, 4), + ) + for i in range(2) + ] + + [ + features_api.CategoricalInput(key="x_cat_1", categories=["mama", "papa"]), + features_api.CategoricalInput(key="x_cat_2", categories=["cat", "dog"]), + ] + ) + outputs = domain_api.Outputs(features=[features_api.ContinuousOutput(key="y")]) + experiments = inputs.sample(n=10) + experiments.eval("y=((x_1**2 + x_2 - 11)**2+(x_1 + x_2**2 -7)**2)", inplace=True) + experiments.loc[experiments.x_cat_1 == "mama", "y"] *= 5.0 + experiments.loc[experiments.x_cat_1 == "papa", "y"] /= 2.0 + experiments.loc[experiments.x_cat_2 == "cat", "y"] *= -2.0 + experiments.loc[experiments.x_cat_2 == "dog", "y"] /= -5.0 + experiments["valid_y"] = 1 + + gp_data = surrogates_api.SingleTaskGPSurrogate( + inputs=inputs, + outputs=outputs, + kernel=kernels_api.AdditiveKernel( + kernels=[ + kernels_api.HammingDistanceKernel( + ard=True, + features=["x_cat_1", "x_cat_2"], + ), + kernels_api.RBFKernel( + ard=True, + lengthscale_prior=HVARFNER_LENGTHSCALE_PRIOR(), + features=[f"x_{i+1}" for i in range(2)], + ), + ] + ), + ) + + gp_mapped = surrogates.map(gp_data) + assert hasattr(gp_mapped, "fit") + assert len(gp_mapped.kernel.kernels) == 2 + assert gp_mapped.kernel.kernels[0].features == ["x_cat_1", "x_cat_2"] + assert gp_mapped.kernel.kernels[1].features == ["x_1", "x_2"] + gp_mapped.fit(experiments) + pred = gp_mapped.predict(experiments) + assert pred.shape == (10, 2) + assert gp_mapped.model.covar_module.kernels[0].active_dims.tolist() == [2, 3, 4, 5] + assert gp_mapped.model.covar_module.kernels[1].active_dims.tolist() == [0, 1] + + +if __name__ == "__main__": + test_SingleTaskGPModel_mixed_features() + + +import sys + + +sys.exit(0) + + +domain = domain_api.Domain( + inputs=domain_api.Inputs( + features=[ + features_api.ContinuousInput(key="x1", bounds=(-1, 1)), + features_api.ContinuousInput(key="x2", bounds=(-1, 1)), + features_api.CategoricalMolecularInput( + key="mol", categories=["CO", "CCO", "CCCO"] + ), + ] + ), + outputs=domain_api.Outputs(features=[features_api.ContinuousOutput(key="f")]), +) + + +strategy = strategies.map( + strategies_api.SoboStrategy( + domain=domain, + surrogate_specs=surrogates_api.BotorchSurrogates( + surrogates=[ + surrogates_api.SingleTaskGPSurrogate( + inputs=domain.inputs, + outputs=domain.outputs, + input_preprocessing_specs={ + "mol": molfeatures_api.Fingerprints(), + }, + kernel=kernels_api.AdditiveKernel( + kernels=[ + kernels_api.RBFKernel( + ard=True, + lengthscale_prior=HVARFNER_LENGTHSCALE_PRIOR(), + features=["x1", "x2"], + ), + kernels_api.TanimotoKernel( + features=["mol"], + ), + ] + ), + ) + ] + ), + ) +) + + +strategy.tell( + experiments=pd.DataFrame( + [ + {"x1": 0.2, "x2": 0.4, "mol": "CO", "f": 1.0}, + {"x1": 0.4, "x2": 0.2, "mol": "CCO", "f": 2.0}, + {"x1": 0.6, "x2": 0.6, "mol": "CCCO", "f": 3.0}, + ] + ) +) +candidates = strategy.ask(candidate_count=1) +print(candidates) diff --git a/tests/bofire/surrogates/test_gps.py b/tests/bofire/surrogates/test_gps.py index 759aae261..d2dbd2861 100644 --- a/tests/bofire/surrogates/test_gps.py +++ b/tests/bofire/surrogates/test_gps.py @@ -335,6 +335,61 @@ def test_SingleTaskGPModel_feature_subsets(): assert len(gp_mapped.model.covar_module.kernels[1].active_dims) == 4 +def test_SingleTaskGPModel_mixed_features(): + """test that we can use a single task gp with mixed features""" + inputs = Inputs( + features=[ + ContinuousInput( + key=f"x_{i+1}", + bounds=(-4, 4), + ) + for i in range(2) + ] + + [ + CategoricalInput(key="x_cat_1", categories=["mama", "papa"]), + CategoricalInput(key="x_cat_2", categories=["cat", "dog"]), + ], + ) + outputs = Outputs(features=[ContinuousOutput(key="y")]) + experiments = inputs.sample(n=10) + experiments.eval("y=((x_1**2 + x_2 - 11)**2+(x_1 + x_2**2 -7)**2)", inplace=True) + experiments.loc[experiments.x_cat_1 == "mama", "y"] *= 5.0 + experiments.loc[experiments.x_cat_1 == "papa", "y"] /= 2.0 + experiments.loc[experiments.x_cat_2 == "cat", "y"] *= -2.0 + experiments.loc[experiments.x_cat_2 == "dog", "y"] /= -5.0 + experiments["valid_y"] = 1 + + gp_data = SingleTaskGPSurrogate( + inputs=inputs, + outputs=outputs, + kernel=AdditiveKernel( + kernels=[ + HammingDistanceKernel( + ard=True, + features=["x_cat_1", "x_cat_2"], + ), + RBFKernel( + ard=True, + lengthscale_prior=HVARFNER_LENGTHSCALE_PRIOR(), + features=[f"x_{i+1}" for i in range(2)], + ), + ] + ), + ) + + gp_mapped = surrogates.map(gp_data) + assert hasattr(gp_mapped, "fit") + assert len(gp_mapped.kernel.kernels) == 2 + assert gp_mapped.kernel.kernels[0].features == ["x_cat_1", "x_cat_2"] + assert gp_mapped.kernel.kernels[1].features == ["x_1", "x_2"] + gp_mapped.fit(experiments) + pred = gp_mapped.predict(experiments) + assert pred.shape == (10, 2) + assert ((pred['y_pred'] - experiments['y'])**2).mean() < 0.5 + assert gp_mapped.model.covar_module.kernels[0].active_dims.tolist() == [2, 3, 4, 5] + assert gp_mapped.model.covar_module.kernels[1].active_dims.tolist() == [0, 1] + + def test_MixedSingleTaskGPHyperconfig(): inputs = Inputs( features=[