From ebbe0ba9aa66d29719a2ad0dfb4bf0819db26488 Mon Sep 17 00:00:00 2001 From: Sam Daulton Date: Wed, 23 Aug 2023 09:46:25 -0700 Subject: [PATCH] fix twisted import structure in botorch.acquisition.utils (#1986) Summary: Pull Request resolved: https://github.com/pytorch/botorch/pull/1986 X-link: https://github.com/facebook/Ax/pull/1783 X-link: https://github.com/facebookexternal/botorch_fb/pull/14 See title. This fixes a twisted import structure where botorch.acquisition.monte_carlo imports from utils and utils imports botorch.acquisition.monte_carlo Reviewed By: esantorella Differential Revision: D48329866 fbshipit-source-id: 13f311a196e4ab2cbe88dbddf35a9a926428eae7 --- botorch/acquisition/__init__.py | 2 +- botorch/acquisition/factory.py | 252 +++++++++ botorch/acquisition/utils.py | 268 +--------- botorch/exceptions/errors.py | 5 + botorch/optim/initializers.py | 33 +- sphinx/source/acquisition.rst | 6 +- test/acquisition/test_factory.py | 845 +++++++++++++++++++++++++++++++ test/acquisition/test_utils.py | 842 +----------------------------- test/exceptions/test_errors.py | 13 +- 9 files changed, 1164 insertions(+), 1102 deletions(-) create mode 100644 botorch/acquisition/factory.py create mode 100644 test/acquisition/test_factory.py diff --git a/botorch/acquisition/__init__.py b/botorch/acquisition/__init__.py index fd0efa77a9..84609b86c7 100644 --- a/botorch/acquisition/__init__.py +++ b/botorch/acquisition/__init__.py @@ -29,6 +29,7 @@ InverseCostWeightedUtility, ) from botorch.acquisition.decoupled import DecoupledAcquisitionFunction +from botorch.acquisition.factory import get_acquisition_function from botorch.acquisition.fixed_feature import FixedFeatureAcquisitionFunction from botorch.acquisition.input_constructors import get_acqf_input_constructor from botorch.acquisition.knowledge_gradient import ( @@ -72,7 +73,6 @@ ) from botorch.acquisition.prior_guided import PriorGuidedAcquisitionFunction from botorch.acquisition.proximal import ProximalAcquisitionFunction -from botorch.acquisition.utils import get_acquisition_function __all__ = [ "AcquisitionFunction", diff --git a/botorch/acquisition/factory.py b/botorch/acquisition/factory.py new file mode 100644 index 0000000000..870448ae60 --- /dev/null +++ b/botorch/acquisition/factory.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +r""" +Utilities for acquisition functions. +""" + +from __future__ import annotations + +from typing import Callable, List, Optional, Union + +import torch +from botorch.acquisition import monte_carlo +from botorch.acquisition.multi_objective import monte_carlo as moo_monte_carlo +from botorch.acquisition.objective import MCAcquisitionObjective, PosteriorTransform +from botorch.acquisition.utils import compute_best_feasible_objective +from botorch.models.model import Model +from botorch.sampling.get_sampler import get_sampler +from botorch.utils.multi_objective.box_decompositions.non_dominated import ( + FastNondominatedPartitioning, + NondominatedPartitioning, +) +from torch import Tensor + + +def get_acquisition_function( + acquisition_function_name: str, + model: Model, + objective: MCAcquisitionObjective, + X_observed: Tensor, + posterior_transform: Optional[PosteriorTransform] = None, + X_pending: Optional[Tensor] = None, + constraints: Optional[List[Callable[[Tensor], Tensor]]] = None, + eta: Optional[Union[Tensor, float]] = 1e-3, + mc_samples: int = 512, + seed: Optional[int] = None, + *, + # optional parameters that are only needed for certain acquisition functions + tau: float = 1e-3, + prune_baseline: bool = True, + marginalize_dim: Optional[int] = None, + cache_root: bool = True, + beta: Optional[float] = None, + ref_point: Union[None, List[float], Tensor] = None, + Y: Optional[Tensor] = None, + alpha: float = 0.0, +) -> monte_carlo.MCAcquisitionFunction: + r"""Convenience function for initializing botorch acquisition functions. + + Args: + acquisition_function_name: Name of the acquisition function. + model: A fitted model. + objective: A MCAcquisitionObjective. + X_observed: A `m1 x d`-dim Tensor of `m1` design points that have + already been observed. + posterior_transform: A PosteriorTransform (optional). + X_pending: A `m2 x d`-dim Tensor of `m2` design points whose evaluation + is pending. + constraints: A list of callables, each mapping a Tensor of dimension + `sample_shape x batch-shape x q x m` to a Tensor of dimension + `sample_shape x batch-shape x q`, where negative values imply + feasibility. Used for all acquisition functions except qSR and qUCB. + eta: The temperature parameter for the sigmoid function used for the + differentiable approximation of the constraints. In case of a float the + same eta is used for every constraint in constraints. In case of a + tensor the length of the tensor must match the number of provided + constraints. The i-th constraint is then estimated with the i-th + eta value. Used for all acquisition functions except qSR and qUCB. + mc_samples: The number of samples to use for (q)MC evaluation of the + acquisition function. + seed: If provided, perform deterministic optimization (i.e. the + function to optimize is fixed and not stochastic). + + Returns: + The requested acquisition function. + + Example: + >>> model = SingleTaskGP(train_X, train_Y) + >>> obj = LinearMCObjective(weights=torch.tensor([1.0, 2.0])) + >>> acqf = get_acquisition_function("qEI", model, obj, train_X) + """ + # initialize the sampler + sampler = get_sampler( + posterior=model.posterior(X_observed[:1]), + sample_shape=torch.Size([mc_samples]), + seed=seed, + ) + if posterior_transform is not None and acquisition_function_name in [ + "qEHVI", + "qNEHVI", + ]: + raise NotImplementedError( + "PosteriorTransforms are not yet implemented for multi-objective " + "acquisition functions." + ) + # instantiate and return the requested acquisition function + if acquisition_function_name in ("qEI", "qLogEI", "qPI"): + # Since these are the non-noisy variants, use the posterior mean at the observed + # inputs directly to compute the best feasible value without sampling. + Y = model.posterior(X_observed, posterior_transform=posterior_transform).mean + obj = objective(samples=Y, X=X_observed) + best_f = compute_best_feasible_objective( + samples=Y, + obj=obj, + constraints=constraints, + model=model, + objective=objective, + posterior_transform=posterior_transform, + X_baseline=X_observed, + ) + if acquisition_function_name == "qEI": + return monte_carlo.qExpectedImprovement( + model=model, + best_f=best_f, + sampler=sampler, + objective=objective, + posterior_transform=posterior_transform, + X_pending=X_pending, + constraints=constraints, + eta=eta, + ) + if acquisition_function_name == "qLogEI": + # putting the import here to avoid circular imports + # ideally, the entire function should be moved out of this file, + # but since it is used for legacy code to be deprecated, we keep it here. + from botorch.acquisition.logei import qLogExpectedImprovement + + return qLogExpectedImprovement( + model=model, + best_f=best_f, + sampler=sampler, + objective=objective, + posterior_transform=posterior_transform, + X_pending=X_pending, + constraints=constraints, + eta=eta, + ) + elif acquisition_function_name == "qPI": + return monte_carlo.qProbabilityOfImprovement( + model=model, + best_f=best_f, + sampler=sampler, + objective=objective, + posterior_transform=posterior_transform, + X_pending=X_pending, + tau=tau, + constraints=constraints, + eta=eta, + ) + elif acquisition_function_name == "qNEI": + return monte_carlo.qNoisyExpectedImprovement( + model=model, + X_baseline=X_observed, + sampler=sampler, + objective=objective, + posterior_transform=posterior_transform, + X_pending=X_pending, + prune_baseline=prune_baseline, + marginalize_dim=marginalize_dim, + cache_root=cache_root, + constraints=constraints, + eta=eta, + ) + elif acquisition_function_name == "qLogNEI": + from botorch.acquisition.logei import qLogNoisyExpectedImprovement + + return qLogNoisyExpectedImprovement( + model=model, + X_baseline=X_observed, + sampler=sampler, + objective=objective, + posterior_transform=posterior_transform, + X_pending=X_pending, + prune_baseline=prune_baseline, + marginalize_dim=marginalize_dim, + cache_root=cache_root, + constraints=constraints, + eta=eta, + ) + elif acquisition_function_name == "qSR": + return monte_carlo.qSimpleRegret( + model=model, + sampler=sampler, + objective=objective, + posterior_transform=posterior_transform, + X_pending=X_pending, + ) + elif acquisition_function_name == "qUCB": + if beta is None: + raise ValueError("`beta` must be not be None for qUCB.") + return monte_carlo.qUpperConfidenceBound( + model=model, + beta=beta, + sampler=sampler, + objective=objective, + posterior_transform=posterior_transform, + X_pending=X_pending, + ) + elif acquisition_function_name == "qEHVI": + if Y is None: + raise ValueError("`Y` must not be None for qEHVI") + if ref_point is None: + raise ValueError("`ref_point` must not be None for qEHVI") + # get feasible points + if constraints is not None: + feas = torch.stack([c(Y) <= 0 for c in constraints], dim=-1).all(dim=-1) + Y = Y[feas] + obj = objective(Y) + if alpha > 0: + partitioning = NondominatedPartitioning( + ref_point=torch.as_tensor(ref_point, dtype=Y.dtype, device=Y.device), + Y=obj, + alpha=alpha, + ) + else: + partitioning = FastNondominatedPartitioning( + ref_point=torch.as_tensor(ref_point, dtype=Y.dtype, device=Y.device), + Y=obj, + ) + return moo_monte_carlo.qExpectedHypervolumeImprovement( + model=model, + ref_point=ref_point, + partitioning=partitioning, + sampler=sampler, + objective=objective, + constraints=constraints, + eta=eta, + X_pending=X_pending, + ) + elif acquisition_function_name == "qNEHVI": + if ref_point is None: + raise ValueError("`ref_point` must not be None for qNEHVI") + return moo_monte_carlo.qNoisyExpectedHypervolumeImprovement( + model=model, + ref_point=ref_point, + X_baseline=X_observed, + sampler=sampler, + objective=objective, + constraints=constraints, + eta=eta, + prune_baseline=prune_baseline, + alpha=alpha, + X_pending=X_pending, + marginalize_dim=marginalize_dim, + cache_root=cache_root, + ) + raise NotImplementedError( + f"Unknown acquisition function {acquisition_function_name}" + ) diff --git a/botorch/acquisition/utils.py b/botorch/acquisition/utils.py index 0dd34ce931..e703bc5969 100644 --- a/botorch/acquisition/utils.py +++ b/botorch/acquisition/utils.py @@ -11,256 +11,29 @@ from __future__ import annotations import math -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Callable, Dict, List, Optional, Tuple import torch -from botorch.acquisition import analytic, monte_carlo, multi_objective # noqa F401 -from botorch.acquisition.acquisition import AcquisitionFunction -from botorch.acquisition.multi_objective import monte_carlo as moo_monte_carlo from botorch.acquisition.objective import ( IdentityMCObjective, MCAcquisitionObjective, PosteriorTransform, ) -from botorch.exceptions.errors import UnsupportedError +from botorch.exceptions.errors import DeprecationError, UnsupportedError from botorch.models.fully_bayesian import MCMC_DIM from botorch.models.model import Model from botorch.sampling.base import MCSampler from botorch.sampling.get_sampler import get_sampler from botorch.sampling.pathwise import draw_matheron_paths -from botorch.utils.multi_objective.box_decompositions.non_dominated import ( - FastNondominatedPartitioning, - NondominatedPartitioning, -) from botorch.utils.objective import compute_feasibility_indicator from botorch.utils.sampling import optimize_posterior_samples from botorch.utils.transforms import is_fully_bayesian from torch import Tensor -def get_acquisition_function( - acquisition_function_name: str, - model: Model, - objective: MCAcquisitionObjective, - X_observed: Tensor, - posterior_transform: Optional[PosteriorTransform] = None, - X_pending: Optional[Tensor] = None, - constraints: Optional[List[Callable[[Tensor], Tensor]]] = None, - eta: Optional[Union[Tensor, float]] = 1e-3, - mc_samples: int = 512, - seed: Optional[int] = None, - *, - # optional parameters that are only needed for certain acquisition functions - tau: float = 1e-3, - prune_baseline: bool = True, - marginalize_dim: Optional[int] = None, - cache_root: bool = True, - beta: Optional[float] = None, - ref_point: Union[None, List[float], Tensor] = None, - Y: Optional[Tensor] = None, - alpha: float = 0.0, -) -> monte_carlo.MCAcquisitionFunction: - r"""Convenience function for initializing botorch acquisition functions. - - Args: - acquisition_function_name: Name of the acquisition function. - model: A fitted model. - objective: A MCAcquisitionObjective. - X_observed: A `m1 x d`-dim Tensor of `m1` design points that have - already been observed. - posterior_transform: A PosteriorTransform (optional). - X_pending: A `m2 x d`-dim Tensor of `m2` design points whose evaluation - is pending. - constraints: A list of callables, each mapping a Tensor of dimension - `sample_shape x batch-shape x q x m` to a Tensor of dimension - `sample_shape x batch-shape x q`, where negative values imply - feasibility. Used for all acquisition functions except qSR and qUCB. - eta: The temperature parameter for the sigmoid function used for the - differentiable approximation of the constraints. In case of a float the - same eta is used for every constraint in constraints. In case of a - tensor the length of the tensor must match the number of provided - constraints. The i-th constraint is then estimated with the i-th - eta value. Used for all acquisition functions except qSR and qUCB. - mc_samples: The number of samples to use for (q)MC evaluation of the - acquisition function. - seed: If provided, perform deterministic optimization (i.e. the - function to optimize is fixed and not stochastic). - - Returns: - The requested acquisition function. - - Example: - >>> model = SingleTaskGP(train_X, train_Y) - >>> obj = LinearMCObjective(weights=torch.tensor([1.0, 2.0])) - >>> acqf = get_acquisition_function("qEI", model, obj, train_X) - """ - # initialize the sampler - sampler = get_sampler( - posterior=model.posterior(X_observed[:1]), - sample_shape=torch.Size([mc_samples]), - seed=seed, - ) - if posterior_transform is not None and acquisition_function_name in [ - "qEHVI", - "qNEHVI", - ]: - raise NotImplementedError( - "PosteriorTransforms are not yet implemented for multi-objective " - "acquisition functions." - ) - # instantiate and return the requested acquisition function - if acquisition_function_name in ("qEI", "qLogEI", "qPI"): - # Since these are the non-noisy variants, use the posterior mean at the observed - # inputs directly to compute the best feasible value without sampling. - Y = model.posterior(X_observed, posterior_transform=posterior_transform).mean - obj = objective(samples=Y, X=X_observed) - best_f = compute_best_feasible_objective( - samples=Y, - obj=obj, - constraints=constraints, - model=model, - objective=objective, - posterior_transform=posterior_transform, - X_baseline=X_observed, - ) - if acquisition_function_name == "qEI": - return monte_carlo.qExpectedImprovement( - model=model, - best_f=best_f, - sampler=sampler, - objective=objective, - posterior_transform=posterior_transform, - X_pending=X_pending, - constraints=constraints, - eta=eta, - ) - if acquisition_function_name == "qLogEI": - # putting the import here to avoid circular imports - # ideally, the entire function should be moved out of this file, - # but since it is used for legacy code to be deprecated, we keep it here. - from botorch.acquisition.logei import qLogExpectedImprovement - - return qLogExpectedImprovement( - model=model, - best_f=best_f, - sampler=sampler, - objective=objective, - posterior_transform=posterior_transform, - X_pending=X_pending, - constraints=constraints, - eta=eta, - ) - elif acquisition_function_name == "qPI": - return monte_carlo.qProbabilityOfImprovement( - model=model, - best_f=best_f, - sampler=sampler, - objective=objective, - posterior_transform=posterior_transform, - X_pending=X_pending, - tau=tau, - constraints=constraints, - eta=eta, - ) - elif acquisition_function_name == "qNEI": - return monte_carlo.qNoisyExpectedImprovement( - model=model, - X_baseline=X_observed, - sampler=sampler, - objective=objective, - posterior_transform=posterior_transform, - X_pending=X_pending, - prune_baseline=prune_baseline, - marginalize_dim=marginalize_dim, - cache_root=cache_root, - constraints=constraints, - eta=eta, - ) - elif acquisition_function_name == "qLogNEI": - from botorch.acquisition.logei import qLogNoisyExpectedImprovement - - return qLogNoisyExpectedImprovement( - model=model, - X_baseline=X_observed, - sampler=sampler, - objective=objective, - posterior_transform=posterior_transform, - X_pending=X_pending, - prune_baseline=prune_baseline, - marginalize_dim=marginalize_dim, - cache_root=cache_root, - constraints=constraints, - eta=eta, - ) - elif acquisition_function_name == "qSR": - return monte_carlo.qSimpleRegret( - model=model, - sampler=sampler, - objective=objective, - posterior_transform=posterior_transform, - X_pending=X_pending, - ) - elif acquisition_function_name == "qUCB": - if beta is None: - raise ValueError("`beta` must be not be None for qUCB.") - return monte_carlo.qUpperConfidenceBound( - model=model, - beta=beta, - sampler=sampler, - objective=objective, - posterior_transform=posterior_transform, - X_pending=X_pending, - ) - elif acquisition_function_name == "qEHVI": - if Y is None: - raise ValueError("`Y` must not be None for qEHVI") - if ref_point is None: - raise ValueError("`ref_point` must not be None for qEHVI") - # get feasible points - if constraints is not None: - feas = torch.stack([c(Y) <= 0 for c in constraints], dim=-1).all(dim=-1) - Y = Y[feas] - obj = objective(Y) - if alpha > 0: - partitioning = NondominatedPartitioning( - ref_point=torch.as_tensor(ref_point, dtype=Y.dtype, device=Y.device), - Y=obj, - alpha=alpha, - ) - else: - partitioning = FastNondominatedPartitioning( - ref_point=torch.as_tensor(ref_point, dtype=Y.dtype, device=Y.device), - Y=obj, - ) - return moo_monte_carlo.qExpectedHypervolumeImprovement( - model=model, - ref_point=ref_point, - partitioning=partitioning, - sampler=sampler, - objective=objective, - constraints=constraints, - eta=eta, - X_pending=X_pending, - ) - elif acquisition_function_name == "qNEHVI": - if ref_point is None: - raise ValueError("`ref_point` must not be None for qNEHVI") - return moo_monte_carlo.qNoisyExpectedHypervolumeImprovement( - model=model, - ref_point=ref_point, - X_baseline=X_observed, - sampler=sampler, - objective=objective, - constraints=constraints, - eta=eta, - prune_baseline=prune_baseline, - alpha=alpha, - X_pending=X_pending, - marginalize_dim=marginalize_dim, - cache_root=cache_root, - ) - raise NotImplementedError( - f"Unknown acquisition function {acquisition_function_name}" +def get_acquisition_function(*args, **kwargs) -> None: + raise DeprecationError( + "`get_acquisition_function` has been moved to `botorch.acquisition.factory`." ) @@ -416,37 +189,6 @@ def objective(Y: Tensor, X: Optional[Tensor] = None): return -(lb.clamp_max(0.0)) -def is_nonnegative(acq_function: AcquisitionFunction) -> bool: - r"""Determine whether a given acquisition function is non-negative. - - Args: - acq_function: The `AcquisitionFunction` instance. - - Returns: - True if `acq_function` is non-negative, False if not, or if the behavior - is unknown (for custom acquisition functions). - - Example: - >>> qEI = qExpectedImprovement(model, best_f=0.1) - >>> is_nonnegative(qEI) # returns True - """ - return isinstance( - acq_function, - ( - analytic.ExpectedImprovement, - analytic.ConstrainedExpectedImprovement, - analytic.ProbabilityOfImprovement, - analytic.NoisyExpectedImprovement, - monte_carlo.qExpectedImprovement, - monte_carlo.qNoisyExpectedImprovement, - monte_carlo.qProbabilityOfImprovement, - multi_objective.analytic.ExpectedHypervolumeImprovement, - multi_objective.monte_carlo.qExpectedHypervolumeImprovement, - multi_objective.monte_carlo.qNoisyExpectedHypervolumeImprovement, - ), - ) - - def prune_inferior_points( model: Model, X: Tensor, diff --git a/botorch/exceptions/errors.py b/botorch/exceptions/errors.py index 9b26692932..5a41042c75 100644 --- a/botorch/exceptions/errors.py +++ b/botorch/exceptions/errors.py @@ -25,6 +25,11 @@ class CandidateGenerationError(BotorchError): pass +class DeprecationError(BotorchError): + r"""Exception raised due to deprecations""" + pass + + class InputDataError(BotorchError): r"""Exception raised when input data does not comply with conventions.""" diff --git a/botorch/optim/initializers.py b/botorch/optim/initializers.py index c89b722098..f7027ac8b9 100644 --- a/botorch/optim/initializers.py +++ b/botorch/optim/initializers.py @@ -20,12 +20,12 @@ import torch from botorch import settings +from botorch.acquisition import analytic, monte_carlo, multi_objective from botorch.acquisition.acquisition import AcquisitionFunction from botorch.acquisition.knowledge_gradient import ( _get_value_function, qKnowledgeGradient, ) -from botorch.acquisition.utils import is_nonnegative from botorch.exceptions.errors import BotorchTensorDimensionError, UnsupportedError from botorch.exceptions.warnings import ( BadInitialCandidatesWarning, @@ -1039,3 +1039,34 @@ def sample_perturbed_subset_dims( # Create candidate points X_cand[mask] = pert[mask] return X_cand + + +def is_nonnegative(acq_function: AcquisitionFunction) -> bool: + r"""Determine whether a given acquisition function is non-negative. + + Args: + acq_function: The `AcquisitionFunction` instance. + + Returns: + True if `acq_function` is non-negative, False if not, or if the behavior + is unknown (for custom acquisition functions). + + Example: + >>> qEI = qExpectedImprovement(model, best_f=0.1) + >>> is_nonnegative(qEI) # returns True + """ + return isinstance( + acq_function, + ( + analytic.ExpectedImprovement, + analytic.ConstrainedExpectedImprovement, + analytic.ProbabilityOfImprovement, + analytic.NoisyExpectedImprovement, + monte_carlo.qExpectedImprovement, + monte_carlo.qNoisyExpectedImprovement, + monte_carlo.qProbabilityOfImprovement, + multi_objective.analytic.ExpectedHypervolumeImprovement, + multi_objective.monte_carlo.qExpectedHypervolumeImprovement, + multi_objective.monte_carlo.qNoisyExpectedHypervolumeImprovement, + ), + ) diff --git a/sphinx/source/acquisition.rst b/sphinx/source/acquisition.rst index c5e8cd5a5c..0e4d72fa7d 100644 --- a/sphinx/source/acquisition.rst +++ b/sphinx/source/acquisition.rst @@ -193,12 +193,16 @@ Proximal Acquisition Function Wrapper .. automodule:: botorch.acquisition.proximal :members: +Factory Functions for Acquisition Functions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +.. automodule:: botorch.acquisition.factory + :members: + General Utilities for Acquisition Functions ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. automodule:: botorch.acquisition.utils :members: - Multi-Objective Utilities for Acquisition Functions ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. automodule:: botorch.acquisition.multi_objective.utils diff --git a/test/acquisition/test_factory.py b/test/acquisition/test_factory.py new file mode 100644 index 0000000000..d739ca448e --- /dev/null +++ b/test/acquisition/test_factory.py @@ -0,0 +1,845 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import math +from unittest import mock + +import torch +from botorch.acquisition import logei, monte_carlo +from botorch.acquisition.factory import get_acquisition_function +from botorch.acquisition.multi_objective import ( + MCMultiOutputObjective, + monte_carlo as moo_monte_carlo, +) +from botorch.acquisition.objective import ( + MCAcquisitionObjective, + ScalarizedPosteriorTransform, +) +from botorch.acquisition.utils import compute_best_feasible_objective +from botorch.utils.multi_objective.box_decompositions.non_dominated import ( + FastNondominatedPartitioning, + NondominatedPartitioning, +) +from botorch.utils.testing import BotorchTestCase, MockModel, MockPosterior +from gpytorch.distributions import MultivariateNormal +from torch import Tensor + + +class DummyMCObjective(MCAcquisitionObjective): + def forward(self, samples: Tensor, X=None) -> Tensor: + return samples.sum(-1) + + +class DummyMCMultiOutputObjective(MCMultiOutputObjective): + def forward(self, samples: Tensor, X=None) -> Tensor: + return samples + + +class TestGetAcquisitionFunction(BotorchTestCase): + def setUp(self): + super().setUp() + self.model = MockModel(MockPosterior()) + self.objective = DummyMCObjective() + self.X_observed = torch.tensor([[1.0, 2.0, 3.0], [2.0, 3.0, 4.0]]) + self.X_pending = torch.tensor([[1.0, 3.0, 4.0]]) + self.mc_samples = 250 + self.qmc = True + self.ref_point = [0.0, 0.0] + self.mo_objective = DummyMCMultiOutputObjective() + self.Y = torch.tensor([[1.0, 2.0]]) # (2 x 1)-dim multi-objective outcomes + self.seed = 1 + + @mock.patch(f"{monte_carlo.__name__}.qExpectedImprovement") + def test_GetQEI(self, mock_acqf): + n = len(self.X_observed) + mean = torch.arange(n, dtype=torch.double).view(-1, 1) + var = torch.ones_like(mean) + self.model = MockModel(MockPosterior(mean=mean, variance=var)) + common_kwargs = { + "model": self.model, + "objective": self.objective, + "X_observed": self.X_observed, + "X_pending": self.X_pending, + "mc_samples": self.mc_samples, + "seed": self.seed, + } + acqf = get_acquisition_function( + acquisition_function_name="qEI", + **common_kwargs, + marginalize_dim=0, + ) + self.assertEqual(acqf, mock_acqf.return_value) + best_f = self.objective(self.model.posterior(self.X_observed).mean).max().item() + mock_acqf.assert_called_once_with( + model=self.model, + best_f=best_f, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + constraints=None, + eta=1e-3, + ) + # test batched model + self.model = MockModel(MockPosterior(mean=torch.zeros(1, 2, 1))) + common_kwargs.update({"model": self.model}) + acqf = get_acquisition_function( + acquisition_function_name="qEI", **common_kwargs + ) + self.assertEqual(acqf, mock_acqf.return_value) + # test batched model without marginalize dim + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 1) + self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) + + # test w/ posterior transform + pm = torch.tensor([1.0, 2.0]) + mvn = MultivariateNormal(pm, torch.eye(2)) + self.model._posterior.distribution = mvn + self.model._posterior._mean = pm.unsqueeze(-1) + common_kwargs.update({"model": self.model}) + pt = ScalarizedPosteriorTransform(weights=torch.tensor([-1])) + acqf = get_acquisition_function( + acquisition_function_name="qEI", + **common_kwargs, + posterior_transform=pt, + marginalize_dim=0, + ) + self.assertEqual(mock_acqf.call_args[-1]["best_f"].item(), -1.0) + + # with constraints + upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 + constraints = [lambda samples: samples[..., 0] - upper_bound] + eta = math.pi * 1e-2 # testing non-standard eta + + acqf = get_acquisition_function( + acquisition_function_name="qEI", + **common_kwargs, + marginalize_dim=0, + constraints=constraints, + eta=eta, + ) + self.assertEqual(acqf, mock_acqf.return_value) + best_feasible_f = compute_best_feasible_objective( + samples=mean, + obj=self.objective(mean), + constraints=constraints, + model=self.model, + objective=self.objective, + X_baseline=self.X_observed, + ) + mock_acqf.assert_called_with( + model=self.model, + best_f=best_feasible_f, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + constraints=constraints, + eta=eta, + ) + + @mock.patch(f"{logei.__name__}.qLogExpectedImprovement") + def test_GetQLogEI(self, mock_acqf): + n = len(self.X_observed) + mean = torch.arange(n, dtype=torch.double).view(-1, 1) + var = torch.ones_like(mean) + self.model = MockModel(MockPosterior(mean=mean, variance=var)) + common_kwargs = { + "model": self.model, + "objective": self.objective, + "X_observed": self.X_observed, + "X_pending": self.X_pending, + "mc_samples": self.mc_samples, + "seed": self.seed, + } + acqf = get_acquisition_function( + acquisition_function_name="qLogEI", + **common_kwargs, + marginalize_dim=0, + ) + self.assertEqual(acqf, mock_acqf.return_value) + best_f = self.objective(self.model.posterior(self.X_observed).mean).max().item() + mock_acqf.assert_called_once_with( + model=self.model, + best_f=best_f, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + constraints=None, + eta=1e-3, + ) + # test batched model + self.model = MockModel(MockPosterior(mean=torch.zeros(1, 2, 1))) + common_kwargs.update({"model": self.model}) + acqf = get_acquisition_function( + acquisition_function_name="qLogEI", **common_kwargs + ) + self.assertEqual(acqf, mock_acqf.return_value) + # test batched model without marginalize dim + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 1) + self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) + + # test w/ posterior transform + pm = torch.tensor([1.0, 2.0]) + mvn = MultivariateNormal(pm, torch.eye(2)) + self.model._posterior.distribution = mvn + self.model._posterior._mean = pm.unsqueeze(-1) + common_kwargs.update({"model": self.model}) + pt = ScalarizedPosteriorTransform(weights=torch.tensor([-1])) + acqf = get_acquisition_function( + acquisition_function_name="qLogEI", + **common_kwargs, + posterior_transform=pt, + marginalize_dim=0, + ) + self.assertEqual(mock_acqf.call_args[-1]["best_f"].item(), -1.0) + + # with constraints + upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 + constraints = [lambda samples: samples[..., 0] - upper_bound] + eta = math.pi * 1e-2 # testing non-standard eta + + acqf = get_acquisition_function( + acquisition_function_name="qLogEI", + **common_kwargs, + marginalize_dim=0, + constraints=constraints, + eta=eta, + ) + self.assertEqual(acqf, mock_acqf.return_value) + best_feasible_f = compute_best_feasible_objective( + samples=mean, + obj=self.objective(mean), + constraints=constraints, + model=self.model, + objective=self.objective, + X_baseline=self.X_observed, + ) + mock_acqf.assert_called_with( + model=self.model, + best_f=best_feasible_f, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + constraints=constraints, + eta=eta, + ) + + @mock.patch(f"{monte_carlo.__name__}.qProbabilityOfImprovement") + def test_GetQPI(self, mock_acqf): + # basic test + n = len(self.X_observed) + mean = torch.arange(n, dtype=torch.double).view(-1, 1) + var = torch.ones_like(mean) + self.model = MockModel(MockPosterior(mean=mean, variance=var)) + acqf = get_acquisition_function( + acquisition_function_name="qPI", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + ) + self.assertEqual(acqf, mock_acqf.return_value) + best_f = self.objective(self.model.posterior(self.X_observed).mean).max().item() + mock_acqf.assert_called_once_with( + model=self.model, + best_f=best_f, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + tau=1e-3, + constraints=None, + eta=1e-3, + ) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 1) + self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) + # test with different tau, non-qmc + acqf = get_acquisition_function( + acquisition_function_name="qPI", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=2, + tau=1.0, + ) + self.assertEqual(mock_acqf.call_count, 2) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + self.assertEqual(kwargs["tau"], 1.0) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 2) + self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) + acqf = get_acquisition_function( + acquisition_function_name="qPI", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=2, + tau=1.0, + ) + # test batched model + self.model = MockModel(MockPosterior(mean=torch.zeros(1, 2, 1))) + acqf = get_acquisition_function( + acquisition_function_name="qPI", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + ) + self.assertEqual(acqf, mock_acqf.return_value) + + # with constraints + n = len(self.X_observed) + mean = torch.arange(n, dtype=torch.double).view(-1, 1) + var = torch.ones_like(mean) + self.model = MockModel(MockPosterior(mean=mean, variance=var)) + upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 + constraints = [lambda samples: samples[..., 0] - upper_bound] + eta = math.pi * 1e-2 # testing non-standard eta + acqf = get_acquisition_function( + acquisition_function_name="qPI", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + marginalize_dim=0, + constraints=constraints, + eta=eta, + ) + self.assertEqual(acqf, mock_acqf.return_value) + best_feasible_f = compute_best_feasible_objective( + samples=mean, + obj=self.objective(mean), + constraints=constraints, + model=self.model, + objective=self.objective, + X_baseline=self.X_observed, + ) + mock_acqf.assert_called_with( + model=self.model, + best_f=best_feasible_f, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + tau=1e-3, + constraints=constraints, + eta=eta, + ) + + @mock.patch(f"{monte_carlo.__name__}.qNoisyExpectedImprovement") + def test_GetQNEI(self, mock_acqf): + # basic test + n = len(self.X_observed) + mean = torch.arange(n, dtype=torch.double).view(-1, 1) + var = torch.ones_like(mean) + self.model = MockModel(MockPosterior(mean=mean, variance=var)) + common_kwargs = { + "model": self.model, + "objective": self.objective, + "X_observed": self.X_observed, + "X_pending": self.X_pending, + "mc_samples": self.mc_samples, + "seed": self.seed, + } + acqf = get_acquisition_function( + acquisition_function_name="qNEI", + **common_kwargs, + marginalize_dim=0, + ) + self.assertEqual(acqf, mock_acqf.return_value) + self.assertEqual(mock_acqf.call_count, 1) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) + self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 1) + self.assertEqual(kwargs["marginalize_dim"], 0) + self.assertEqual(kwargs["cache_root"], True) + # test with cache_root = False + acqf = get_acquisition_function( + acquisition_function_name="qNEI", + **common_kwargs, + marginalize_dim=0, + cache_root=False, + ) + self.assertEqual(acqf, mock_acqf.return_value) + self.assertEqual(mock_acqf.call_count, 2) + args, kwargs = mock_acqf.call_args + self.assertEqual(kwargs["cache_root"], False) + # test with non-qmc, no X_pending + common_kwargs.update({"X_pending": None}) + acqf = get_acquisition_function( + acquisition_function_name="qNEI", + **common_kwargs, + ) + self.assertEqual(mock_acqf.call_count, 3) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) + self.assertEqual(kwargs["X_pending"], None) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 1) + self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) + + # with constraints + upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 + constraints = [lambda samples: samples[..., 0] - upper_bound] + eta = math.pi * 1e-2 # testing non-standard eta + common_kwargs.update({"X_pending": self.X_pending}) + acqf = get_acquisition_function( + acquisition_function_name="qNEI", + **common_kwargs, + marginalize_dim=0, + constraints=constraints, + eta=eta, + ) + self.assertEqual(acqf, mock_acqf.return_value) + mock_acqf.assert_called_with( + model=self.model, + X_baseline=self.X_observed, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + prune_baseline=True, + marginalize_dim=0, + cache_root=True, + constraints=constraints, + eta=eta, + ) + + @mock.patch(f"{logei.__name__}.qLogNoisyExpectedImprovement") + def test_GetQLogNEI(self, mock_acqf): + # basic test + n = len(self.X_observed) + mean = torch.arange(n, dtype=torch.double).view(-1, 1) + var = torch.ones_like(mean) + self.model = MockModel(MockPosterior(mean=mean, variance=var)) + common_kwargs = { + "model": self.model, + "objective": self.objective, + "X_observed": self.X_observed, + "X_pending": self.X_pending, + "mc_samples": self.mc_samples, + "seed": self.seed, + } + acqf = get_acquisition_function( + acquisition_function_name="qLogNEI", + **common_kwargs, + marginalize_dim=0, + ) + self.assertEqual(acqf, mock_acqf.return_value) + self.assertEqual(mock_acqf.call_count, 1) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) + self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 1) + self.assertEqual(kwargs["marginalize_dim"], 0) + self.assertEqual(kwargs["cache_root"], True) + # test with cache_root = False + acqf = get_acquisition_function( + acquisition_function_name="qLogNEI", + **common_kwargs, + marginalize_dim=0, + cache_root=False, + ) + self.assertEqual(acqf, mock_acqf.return_value) + self.assertEqual(mock_acqf.call_count, 2) + args, kwargs = mock_acqf.call_args + self.assertEqual(kwargs["cache_root"], False) + # test with non-qmc, no X_pending + common_kwargs.update({"X_pending": None}) + acqf = get_acquisition_function( + acquisition_function_name="qLogNEI", + **common_kwargs, + ) + self.assertEqual(mock_acqf.call_count, 3) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) + self.assertEqual(kwargs["X_pending"], None) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 1) + self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) + + # with constraints + upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 + constraints = [lambda samples: samples[..., 0] - upper_bound] + eta = math.pi * 1e-2 # testing non-standard eta + common_kwargs.update({"X_pending": self.X_pending}) + acqf = get_acquisition_function( + acquisition_function_name="qLogNEI", + **common_kwargs, + marginalize_dim=0, + constraints=constraints, + eta=eta, + ) + self.assertEqual(acqf, mock_acqf.return_value) + mock_acqf.assert_called_with( + model=self.model, + X_baseline=self.X_observed, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + prune_baseline=True, + marginalize_dim=0, + cache_root=True, + constraints=constraints, + eta=eta, + ) + + @mock.patch(f"{monte_carlo.__name__}.qSimpleRegret") + def test_GetQSR(self, mock_acqf): + # basic test + acqf = get_acquisition_function( + acquisition_function_name="qSR", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + ) + self.assertEqual(acqf, mock_acqf.return_value) + mock_acqf.assert_called_once_with( + model=self.model, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + ) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 1) + self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) + # test with non-qmc + acqf = get_acquisition_function( + acquisition_function_name="qSR", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=2, + ) + self.assertEqual(mock_acqf.call_count, 2) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 2) + self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) + + @mock.patch(f"{monte_carlo.__name__}.qUpperConfidenceBound") + def test_GetQUCB(self, mock_acqf): + # make sure beta is specified + with self.assertRaises(ValueError): + acqf = get_acquisition_function( + acquisition_function_name="qUCB", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + ) + acqf = get_acquisition_function( + acquisition_function_name="qUCB", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + beta=0.3, + ) + self.assertEqual(acqf, mock_acqf.return_value) + mock_acqf.assert_called_once_with( + model=self.model, + beta=0.3, + sampler=mock.ANY, + objective=self.objective, + posterior_transform=None, + X_pending=self.X_pending, + ) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 1) + self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) + # test with different tau, non-qmc + acqf = get_acquisition_function( + acquisition_function_name="qUCB", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=2, + beta=0.2, + ) + self.assertEqual(mock_acqf.call_count, 2) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + self.assertEqual(kwargs["beta"], 0.2) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 2) + self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) + + @mock.patch(f"{moo_monte_carlo.__name__}.qExpectedHypervolumeImprovement") + def test_GetQEHVI(self, mock_acqf): + # make sure ref_point is specified + with self.assertRaises(ValueError): + acqf = get_acquisition_function( + acquisition_function_name="qEHVI", + model=self.model, + objective=self.mo_objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + Y=self.Y, + ) + # make sure Y is specified + with self.assertRaises(ValueError): + acqf = get_acquisition_function( + acquisition_function_name="qEHVI", + model=self.model, + objective=self.mo_objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + ref_point=self.ref_point, + ) + # posterior transforms are not supported + with self.assertRaises(NotImplementedError): + acqf = get_acquisition_function( + acquisition_function_name="qEHVI", + model=self.model, + objective=self.mo_objective, + posterior_transform=ScalarizedPosteriorTransform(weights=torch.rand(2)), + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + ref_point=self.ref_point, + ) + acqf = get_acquisition_function( + acquisition_function_name="qEHVI", + model=self.model, + objective=self.mo_objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + ref_point=self.ref_point, + Y=self.Y, + ) + self.assertEqual(acqf, mock_acqf.return_value) + mock_acqf.assert_called_once_with( + constraints=None, + eta=1e-3, + model=self.model, + objective=self.mo_objective, + ref_point=self.ref_point, + partitioning=mock.ANY, + sampler=mock.ANY, + X_pending=self.X_pending, + ) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 1) + + acqf = get_acquisition_function( + acquisition_function_name="qEHVI", + model=self.model, + objective=self.mo_objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=2, + ref_point=self.ref_point, + Y=self.Y, + ) + self.assertEqual(mock_acqf.call_count, 2) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + self.assertEqual(kwargs["ref_point"], self.ref_point) + sampler = kwargs["sampler"] + self.assertIsInstance(kwargs["objective"], DummyMCMultiOutputObjective) + partitioning = kwargs["partitioning"] + self.assertIsInstance(partitioning, FastNondominatedPartitioning) + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 2) + # test that approximate partitioning is used when alpha > 0 + acqf = get_acquisition_function( + acquisition_function_name="qEHVI", + model=self.model, + objective=self.mo_objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=2, + ref_point=self.ref_point, + Y=self.Y, + alpha=0.1, + ) + _, kwargs = mock_acqf.call_args + partitioning = kwargs["partitioning"] + self.assertIsInstance(partitioning, NondominatedPartitioning) + self.assertEqual(partitioning.alpha, 0.1) + # test constraints + acqf = get_acquisition_function( + acquisition_function_name="qEHVI", + model=self.model, + objective=self.mo_objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + constraints=[lambda Y: Y[..., -1]], + eta=1e-2, + seed=2, + ref_point=self.ref_point, + Y=self.Y, + ) + _, kwargs = mock_acqf.call_args + partitioning = kwargs["partitioning"] + self.assertEqual(partitioning.pareto_Y.shape[0], 0) + self.assertEqual(kwargs["eta"], 1e-2) + + @mock.patch(f"{moo_monte_carlo.__name__}.qNoisyExpectedHypervolumeImprovement") + def test_GetQNEHVI(self, mock_acqf): + # make sure ref_point is specified + with self.assertRaises(ValueError): + acqf = get_acquisition_function( + acquisition_function_name="qNEHVI", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + ) + acqf = get_acquisition_function( + acquisition_function_name="qNEHVI", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + ref_point=self.ref_point, + ) + self.assertEqual(acqf, mock_acqf.return_value) + mock_acqf.assert_called_once_with( + constraints=None, + eta=1e-3, + model=self.model, + X_baseline=self.X_observed, + objective=self.objective, + ref_point=self.ref_point, + sampler=mock.ANY, + prune_baseline=True, + alpha=0.0, + X_pending=self.X_pending, + marginalize_dim=None, + cache_root=True, + ) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + sampler = kwargs["sampler"] + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 1) + # test with non-qmc + acqf = get_acquisition_function( + acquisition_function_name="qNEHVI", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=2, + ref_point=self.ref_point, + ) + self.assertEqual(mock_acqf.call_count, 2) + args, kwargs = mock_acqf.call_args + self.assertEqual(args, ()) + self.assertEqual(kwargs["ref_point"], self.ref_point) + sampler = kwargs["sampler"] + ref_point = kwargs["ref_point"] + self.assertEqual(ref_point, self.ref_point) + self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) + self.assertEqual(sampler.seed, 2) + + # test passing alpha + acqf = get_acquisition_function( + acquisition_function_name="qNEHVI", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=2, + ref_point=self.ref_point, + alpha=0.01, + ) + self.assertEqual(mock_acqf.call_count, 3) + args, kwargs = mock_acqf.call_args + self.assertEqual(kwargs["alpha"], 0.01) + + def test_GetUnknownAcquisitionFunction(self): + with self.assertRaises(NotImplementedError): + get_acquisition_function( + acquisition_function_name="foo", + model=self.model, + objective=self.objective, + X_observed=self.X_observed, + X_pending=self.X_pending, + mc_samples=self.mc_samples, + seed=self.seed, + ) diff --git a/test/acquisition/test_utils.py b/test/acquisition/test_utils.py index 0ec4b748ee..84f6f7d2b0 100644 --- a/test/acquisition/test_utils.py +++ b/test/acquisition/test_utils.py @@ -5,20 +5,10 @@ # LICENSE file in the root directory of this source tree. import itertools -import math from unittest import mock import torch -from botorch.acquisition import logei, monte_carlo -from botorch.acquisition.multi_objective import ( - MCMultiOutputObjective, - monte_carlo as moo_monte_carlo, -) -from botorch.acquisition.objective import ( - GenericMCObjective, - MCAcquisitionObjective, - ScalarizedPosteriorTransform, -) +from botorch.acquisition.objective import GenericMCObjective from botorch.acquisition.utils import ( compute_best_feasible_objective, expand_trace_observations, @@ -29,832 +19,20 @@ project_to_target_fidelity, prune_inferior_points, ) -from botorch.exceptions.errors import UnsupportedError +from botorch.exceptions.errors import DeprecationError, UnsupportedError from botorch.models import SingleTaskGP -from botorch.utils.multi_objective.box_decompositions.non_dominated import ( - FastNondominatedPartitioning, - NondominatedPartitioning, -) -from botorch.utils.testing import BotorchTestCase, MockModel, MockPosterior -from gpytorch.distributions import MultivariateNormal -from torch import Tensor - - -class DummyMCObjective(MCAcquisitionObjective): - def forward(self, samples: Tensor, X=None) -> Tensor: - return samples.sum(-1) - - -class DummyMCMultiOutputObjective(MCMultiOutputObjective): - def forward(self, samples: Tensor, X=None) -> Tensor: - return samples - - -class TestGetAcquisitionFunction(BotorchTestCase): - def setUp(self): - super().setUp() - self.model = MockModel(MockPosterior()) - self.objective = DummyMCObjective() - self.X_observed = torch.tensor([[1.0, 2.0, 3.0], [2.0, 3.0, 4.0]]) - self.X_pending = torch.tensor([[1.0, 3.0, 4.0]]) - self.mc_samples = 250 - self.qmc = True - self.ref_point = [0.0, 0.0] - self.mo_objective = DummyMCMultiOutputObjective() - self.Y = torch.tensor([[1.0, 2.0]]) # (2 x 1)-dim multi-objective outcomes - self.seed = 1 - - @mock.patch(f"{monte_carlo.__name__}.qExpectedImprovement") - def test_GetQEI(self, mock_acqf): - n = len(self.X_observed) - mean = torch.arange(n, dtype=torch.double).view(-1, 1) - var = torch.ones_like(mean) - self.model = MockModel(MockPosterior(mean=mean, variance=var)) - common_kwargs = { - "model": self.model, - "objective": self.objective, - "X_observed": self.X_observed, - "X_pending": self.X_pending, - "mc_samples": self.mc_samples, - "seed": self.seed, - } - acqf = get_acquisition_function( - acquisition_function_name="qEI", - **common_kwargs, - marginalize_dim=0, - ) - self.assertEqual(acqf, mock_acqf.return_value) - best_f = self.objective(self.model.posterior(self.X_observed).mean).max().item() - mock_acqf.assert_called_once_with( - model=self.model, - best_f=best_f, - sampler=mock.ANY, - objective=self.objective, - posterior_transform=None, - X_pending=self.X_pending, - constraints=None, - eta=1e-3, - ) - # test batched model - self.model = MockModel(MockPosterior(mean=torch.zeros(1, 2, 1))) - common_kwargs.update({"model": self.model}) - acqf = get_acquisition_function( - acquisition_function_name="qEI", **common_kwargs - ) - self.assertEqual(acqf, mock_acqf.return_value) - # test batched model without marginalize dim - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 1) - self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) - - # test w/ posterior transform - pm = torch.tensor([1.0, 2.0]) - mvn = MultivariateNormal(pm, torch.eye(2)) - self.model._posterior.distribution = mvn - self.model._posterior._mean = pm.unsqueeze(-1) - common_kwargs.update({"model": self.model}) - pt = ScalarizedPosteriorTransform(weights=torch.tensor([-1])) - acqf = get_acquisition_function( - acquisition_function_name="qEI", - **common_kwargs, - posterior_transform=pt, - marginalize_dim=0, - ) - self.assertEqual(mock_acqf.call_args[-1]["best_f"].item(), -1.0) - - # with constraints - upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 - constraints = [lambda samples: samples[..., 0] - upper_bound] - eta = math.pi * 1e-2 # testing non-standard eta - - acqf = get_acquisition_function( - acquisition_function_name="qEI", - **common_kwargs, - marginalize_dim=0, - constraints=constraints, - eta=eta, - ) - self.assertEqual(acqf, mock_acqf.return_value) - best_feasible_f = compute_best_feasible_objective( - samples=mean, - obj=self.objective(mean), - constraints=constraints, - model=self.model, - objective=self.objective, - X_baseline=self.X_observed, - ) - mock_acqf.assert_called_with( - model=self.model, - best_f=best_feasible_f, - sampler=mock.ANY, - objective=self.objective, - posterior_transform=None, - X_pending=self.X_pending, - constraints=constraints, - eta=eta, - ) - @mock.patch(f"{logei.__name__}.qLogExpectedImprovement") - def test_GetQLogEI(self, mock_acqf): - n = len(self.X_observed) - mean = torch.arange(n, dtype=torch.double).view(-1, 1) - var = torch.ones_like(mean) - self.model = MockModel(MockPosterior(mean=mean, variance=var)) - common_kwargs = { - "model": self.model, - "objective": self.objective, - "X_observed": self.X_observed, - "X_pending": self.X_pending, - "mc_samples": self.mc_samples, - "seed": self.seed, - } - acqf = get_acquisition_function( - acquisition_function_name="qLogEI", - **common_kwargs, - marginalize_dim=0, - ) - self.assertEqual(acqf, mock_acqf.return_value) - best_f = self.objective(self.model.posterior(self.X_observed).mean).max().item() - mock_acqf.assert_called_once_with( - model=self.model, - best_f=best_f, - sampler=mock.ANY, - objective=self.objective, - posterior_transform=None, - X_pending=self.X_pending, - constraints=None, - eta=1e-3, - ) - # test batched model - self.model = MockModel(MockPosterior(mean=torch.zeros(1, 2, 1))) - common_kwargs.update({"model": self.model}) - acqf = get_acquisition_function( - acquisition_function_name="qLogEI", **common_kwargs - ) - self.assertEqual(acqf, mock_acqf.return_value) - # test batched model without marginalize dim - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 1) - self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) - - # test w/ posterior transform - pm = torch.tensor([1.0, 2.0]) - mvn = MultivariateNormal(pm, torch.eye(2)) - self.model._posterior.distribution = mvn - self.model._posterior._mean = pm.unsqueeze(-1) - common_kwargs.update({"model": self.model}) - pt = ScalarizedPosteriorTransform(weights=torch.tensor([-1])) - acqf = get_acquisition_function( - acquisition_function_name="qLogEI", - **common_kwargs, - posterior_transform=pt, - marginalize_dim=0, - ) - self.assertEqual(mock_acqf.call_args[-1]["best_f"].item(), -1.0) - - # with constraints - upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 - constraints = [lambda samples: samples[..., 0] - upper_bound] - eta = math.pi * 1e-2 # testing non-standard eta - - acqf = get_acquisition_function( - acquisition_function_name="qLogEI", - **common_kwargs, - marginalize_dim=0, - constraints=constraints, - eta=eta, - ) - self.assertEqual(acqf, mock_acqf.return_value) - best_feasible_f = compute_best_feasible_objective( - samples=mean, - obj=self.objective(mean), - constraints=constraints, - model=self.model, - objective=self.objective, - X_baseline=self.X_observed, - ) - mock_acqf.assert_called_with( - model=self.model, - best_f=best_feasible_f, - sampler=mock.ANY, - objective=self.objective, - posterior_transform=None, - X_pending=self.X_pending, - constraints=constraints, - eta=eta, - ) - - @mock.patch(f"{monte_carlo.__name__}.qProbabilityOfImprovement") - def test_GetQPI(self, mock_acqf): - # basic test - n = len(self.X_observed) - mean = torch.arange(n, dtype=torch.double).view(-1, 1) - var = torch.ones_like(mean) - self.model = MockModel(MockPosterior(mean=mean, variance=var)) - acqf = get_acquisition_function( - acquisition_function_name="qPI", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - ) - self.assertEqual(acqf, mock_acqf.return_value) - best_f = self.objective(self.model.posterior(self.X_observed).mean).max().item() - mock_acqf.assert_called_once_with( - model=self.model, - best_f=best_f, - sampler=mock.ANY, - objective=self.objective, - posterior_transform=None, - X_pending=self.X_pending, - tau=1e-3, - constraints=None, - eta=1e-3, - ) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 1) - self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) - # test with different tau, non-qmc - acqf = get_acquisition_function( - acquisition_function_name="qPI", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=2, - tau=1.0, - ) - self.assertEqual(mock_acqf.call_count, 2) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - self.assertEqual(kwargs["tau"], 1.0) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 2) - self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) - acqf = get_acquisition_function( - acquisition_function_name="qPI", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=2, - tau=1.0, - ) - # test batched model - self.model = MockModel(MockPosterior(mean=torch.zeros(1, 2, 1))) - acqf = get_acquisition_function( - acquisition_function_name="qPI", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - ) - self.assertEqual(acqf, mock_acqf.return_value) - - # with constraints - n = len(self.X_observed) - mean = torch.arange(n, dtype=torch.double).view(-1, 1) - var = torch.ones_like(mean) - self.model = MockModel(MockPosterior(mean=mean, variance=var)) - upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 - constraints = [lambda samples: samples[..., 0] - upper_bound] - eta = math.pi * 1e-2 # testing non-standard eta - acqf = get_acquisition_function( - acquisition_function_name="qPI", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - marginalize_dim=0, - constraints=constraints, - eta=eta, - ) - self.assertEqual(acqf, mock_acqf.return_value) - best_feasible_f = compute_best_feasible_objective( - samples=mean, - obj=self.objective(mean), - constraints=constraints, - model=self.model, - objective=self.objective, - X_baseline=self.X_observed, - ) - mock_acqf.assert_called_with( - model=self.model, - best_f=best_feasible_f, - sampler=mock.ANY, - objective=self.objective, - posterior_transform=None, - X_pending=self.X_pending, - tau=1e-3, - constraints=constraints, - eta=eta, - ) - - @mock.patch(f"{monte_carlo.__name__}.qNoisyExpectedImprovement") - def test_GetQNEI(self, mock_acqf): - # basic test - n = len(self.X_observed) - mean = torch.arange(n, dtype=torch.double).view(-1, 1) - var = torch.ones_like(mean) - self.model = MockModel(MockPosterior(mean=mean, variance=var)) - common_kwargs = { - "model": self.model, - "objective": self.objective, - "X_observed": self.X_observed, - "X_pending": self.X_pending, - "mc_samples": self.mc_samples, - "seed": self.seed, - } - acqf = get_acquisition_function( - acquisition_function_name="qNEI", - **common_kwargs, - marginalize_dim=0, - ) - self.assertEqual(acqf, mock_acqf.return_value) - self.assertEqual(mock_acqf.call_count, 1) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) - self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 1) - self.assertEqual(kwargs["marginalize_dim"], 0) - self.assertEqual(kwargs["cache_root"], True) - # test with cache_root = False - acqf = get_acquisition_function( - acquisition_function_name="qNEI", - **common_kwargs, - marginalize_dim=0, - cache_root=False, - ) - self.assertEqual(acqf, mock_acqf.return_value) - self.assertEqual(mock_acqf.call_count, 2) - args, kwargs = mock_acqf.call_args - self.assertEqual(kwargs["cache_root"], False) - # test with non-qmc, no X_pending - common_kwargs.update({"X_pending": None}) - acqf = get_acquisition_function( - acquisition_function_name="qNEI", - **common_kwargs, - ) - self.assertEqual(mock_acqf.call_count, 3) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) - self.assertEqual(kwargs["X_pending"], None) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 1) - self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) - - # with constraints - upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 - constraints = [lambda samples: samples[..., 0] - upper_bound] - eta = math.pi * 1e-2 # testing non-standard eta - common_kwargs.update({"X_pending": self.X_pending}) - acqf = get_acquisition_function( - acquisition_function_name="qNEI", - **common_kwargs, - marginalize_dim=0, - constraints=constraints, - eta=eta, - ) - self.assertEqual(acqf, mock_acqf.return_value) - mock_acqf.assert_called_with( - model=self.model, - X_baseline=self.X_observed, - sampler=mock.ANY, - objective=self.objective, - posterior_transform=None, - X_pending=self.X_pending, - prune_baseline=True, - marginalize_dim=0, - cache_root=True, - constraints=constraints, - eta=eta, - ) - - @mock.patch(f"{logei.__name__}.qLogNoisyExpectedImprovement") - def test_GetQLogNEI(self, mock_acqf): - # basic test - n = len(self.X_observed) - mean = torch.arange(n, dtype=torch.double).view(-1, 1) - var = torch.ones_like(mean) - self.model = MockModel(MockPosterior(mean=mean, variance=var)) - common_kwargs = { - "model": self.model, - "objective": self.objective, - "X_observed": self.X_observed, - "X_pending": self.X_pending, - "mc_samples": self.mc_samples, - "seed": self.seed, - } - acqf = get_acquisition_function( - acquisition_function_name="qLogNEI", - **common_kwargs, - marginalize_dim=0, - ) - self.assertEqual(acqf, mock_acqf.return_value) - self.assertEqual(mock_acqf.call_count, 1) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) - self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 1) - self.assertEqual(kwargs["marginalize_dim"], 0) - self.assertEqual(kwargs["cache_root"], True) - # test with cache_root = False - acqf = get_acquisition_function( - acquisition_function_name="qLogNEI", - **common_kwargs, - marginalize_dim=0, - cache_root=False, - ) - self.assertEqual(acqf, mock_acqf.return_value) - self.assertEqual(mock_acqf.call_count, 2) - args, kwargs = mock_acqf.call_args - self.assertEqual(kwargs["cache_root"], False) - # test with non-qmc, no X_pending - common_kwargs.update({"X_pending": None}) - acqf = get_acquisition_function( - acquisition_function_name="qLogNEI", - **common_kwargs, - ) - self.assertEqual(mock_acqf.call_count, 3) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) - self.assertEqual(kwargs["X_pending"], None) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 1) - self.assertTrue(torch.equal(kwargs["X_baseline"], self.X_observed)) - - # with constraints - upper_bound = self.Y[0, 0] + 1 / 2 # = 1.5 - constraints = [lambda samples: samples[..., 0] - upper_bound] - eta = math.pi * 1e-2 # testing non-standard eta - common_kwargs.update({"X_pending": self.X_pending}) - acqf = get_acquisition_function( - acquisition_function_name="qLogNEI", - **common_kwargs, - marginalize_dim=0, - constraints=constraints, - eta=eta, - ) - self.assertEqual(acqf, mock_acqf.return_value) - mock_acqf.assert_called_with( - model=self.model, - X_baseline=self.X_observed, - sampler=mock.ANY, - objective=self.objective, - posterior_transform=None, - X_pending=self.X_pending, - prune_baseline=True, - marginalize_dim=0, - cache_root=True, - constraints=constraints, - eta=eta, - ) - - @mock.patch(f"{monte_carlo.__name__}.qSimpleRegret") - def test_GetQSR(self, mock_acqf): - # basic test - acqf = get_acquisition_function( - acquisition_function_name="qSR", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - ) - self.assertEqual(acqf, mock_acqf.return_value) - mock_acqf.assert_called_once_with( - model=self.model, - sampler=mock.ANY, - objective=self.objective, - posterior_transform=None, - X_pending=self.X_pending, - ) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 1) - self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) - # test with non-qmc - acqf = get_acquisition_function( - acquisition_function_name="qSR", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=2, - ) - self.assertEqual(mock_acqf.call_count, 2) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 2) - self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) - - @mock.patch(f"{monte_carlo.__name__}.qUpperConfidenceBound") - def test_GetQUCB(self, mock_acqf): - # make sure beta is specified - with self.assertRaises(ValueError): - acqf = get_acquisition_function( - acquisition_function_name="qUCB", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - ) - acqf = get_acquisition_function( - acquisition_function_name="qUCB", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - beta=0.3, - ) - self.assertEqual(acqf, mock_acqf.return_value) - mock_acqf.assert_called_once_with( - model=self.model, - beta=0.3, - sampler=mock.ANY, - objective=self.objective, - posterior_transform=None, - X_pending=self.X_pending, - ) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 1) - self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) - # test with different tau, non-qmc - acqf = get_acquisition_function( - acquisition_function_name="qUCB", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=2, - beta=0.2, - ) - self.assertEqual(mock_acqf.call_count, 2) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - self.assertEqual(kwargs["beta"], 0.2) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 2) - self.assertTrue(torch.equal(kwargs["X_pending"], self.X_pending)) - - @mock.patch(f"{moo_monte_carlo.__name__}.qExpectedHypervolumeImprovement") - def test_GetQEHVI(self, mock_acqf): - # make sure ref_point is specified - with self.assertRaises(ValueError): - acqf = get_acquisition_function( - acquisition_function_name="qEHVI", - model=self.model, - objective=self.mo_objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - Y=self.Y, - ) - # make sure Y is specified - with self.assertRaises(ValueError): - acqf = get_acquisition_function( - acquisition_function_name="qEHVI", - model=self.model, - objective=self.mo_objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - ref_point=self.ref_point, - ) - # posterior transforms are not supported - with self.assertRaises(NotImplementedError): - acqf = get_acquisition_function( - acquisition_function_name="qEHVI", - model=self.model, - objective=self.mo_objective, - posterior_transform=ScalarizedPosteriorTransform(weights=torch.rand(2)), - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - ref_point=self.ref_point, - ) - acqf = get_acquisition_function( - acquisition_function_name="qEHVI", - model=self.model, - objective=self.mo_objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - ref_point=self.ref_point, - Y=self.Y, - ) - self.assertEqual(acqf, mock_acqf.return_value) - mock_acqf.assert_called_once_with( - constraints=None, - eta=1e-3, - model=self.model, - objective=self.mo_objective, - ref_point=self.ref_point, - partitioning=mock.ANY, - sampler=mock.ANY, - X_pending=self.X_pending, - ) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 1) - - acqf = get_acquisition_function( - acquisition_function_name="qEHVI", - model=self.model, - objective=self.mo_objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=2, - ref_point=self.ref_point, - Y=self.Y, - ) - self.assertEqual(mock_acqf.call_count, 2) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - self.assertEqual(kwargs["ref_point"], self.ref_point) - sampler = kwargs["sampler"] - self.assertIsInstance(kwargs["objective"], DummyMCMultiOutputObjective) - partitioning = kwargs["partitioning"] - self.assertIsInstance(partitioning, FastNondominatedPartitioning) - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 2) - # test that approximate partitioning is used when alpha > 0 - acqf = get_acquisition_function( - acquisition_function_name="qEHVI", - model=self.model, - objective=self.mo_objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=2, - ref_point=self.ref_point, - Y=self.Y, - alpha=0.1, - ) - _, kwargs = mock_acqf.call_args - partitioning = kwargs["partitioning"] - self.assertIsInstance(partitioning, NondominatedPartitioning) - self.assertEqual(partitioning.alpha, 0.1) - # test constraints - acqf = get_acquisition_function( - acquisition_function_name="qEHVI", - model=self.model, - objective=self.mo_objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - constraints=[lambda Y: Y[..., -1]], - eta=1e-2, - seed=2, - ref_point=self.ref_point, - Y=self.Y, - ) - _, kwargs = mock_acqf.call_args - partitioning = kwargs["partitioning"] - self.assertEqual(partitioning.pareto_Y.shape[0], 0) - self.assertEqual(kwargs["eta"], 1e-2) +from botorch.utils.testing import BotorchTestCase, MockModel, MockPosterior - @mock.patch(f"{moo_monte_carlo.__name__}.qNoisyExpectedHypervolumeImprovement") - def test_GetQNEHVI(self, mock_acqf): - # make sure ref_point is specified - with self.assertRaises(ValueError): - acqf = get_acquisition_function( - acquisition_function_name="qNEHVI", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - ) - acqf = get_acquisition_function( - acquisition_function_name="qNEHVI", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - ref_point=self.ref_point, - ) - self.assertEqual(acqf, mock_acqf.return_value) - mock_acqf.assert_called_once_with( - constraints=None, - eta=1e-3, - model=self.model, - X_baseline=self.X_observed, - objective=self.objective, - ref_point=self.ref_point, - sampler=mock.ANY, - prune_baseline=True, - alpha=0.0, - X_pending=self.X_pending, - marginalize_dim=None, - cache_root=True, - ) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - sampler = kwargs["sampler"] - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 1) - # test with non-qmc - acqf = get_acquisition_function( - acquisition_function_name="qNEHVI", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=2, - ref_point=self.ref_point, - ) - self.assertEqual(mock_acqf.call_count, 2) - args, kwargs = mock_acqf.call_args - self.assertEqual(args, ()) - self.assertEqual(kwargs["ref_point"], self.ref_point) - sampler = kwargs["sampler"] - ref_point = kwargs["ref_point"] - self.assertEqual(ref_point, self.ref_point) - self.assertEqual(sampler.sample_shape, torch.Size([self.mc_samples])) - self.assertEqual(sampler.seed, 2) - # test passing alpha - acqf = get_acquisition_function( - acquisition_function_name="qNEHVI", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=2, - ref_point=self.ref_point, - alpha=0.01, +class TestGetAcquisitionFunctionDeprecation(BotorchTestCase): + def test_get_acquisition_function_deprecation(self): + msg = ( + "`get_acquisition_function` has been moved to" + " `botorch.acquisition.factory`." ) - self.assertEqual(mock_acqf.call_count, 3) - args, kwargs = mock_acqf.call_args - self.assertEqual(kwargs["alpha"], 0.01) - - def test_GetUnknownAcquisitionFunction(self): - with self.assertRaises(NotImplementedError): - get_acquisition_function( - acquisition_function_name="foo", - model=self.model, - objective=self.objective, - X_observed=self.X_observed, - X_pending=self.X_pending, - mc_samples=self.mc_samples, - seed=self.seed, - ) + with self.assertRaisesRegex(DeprecationError, msg): + get_acquisition_function() class TestConstraintUtils(BotorchTestCase): diff --git a/test/exceptions/test_errors.py b/test/exceptions/test_errors.py index 12ed03bee1..f20de1536b 100644 --- a/test/exceptions/test_errors.py +++ b/test/exceptions/test_errors.py @@ -10,6 +10,7 @@ BotorchError, BotorchTensorDimensionError, CandidateGenerationError, + DeprecationError, InputDataError, OptimizationTimeoutError, UnsupportedError, @@ -20,10 +21,14 @@ class TestBotorchExceptions(BotorchTestCase): def test_botorch_exception_hierarchy(self): self.assertIsInstance(BotorchError(), Exception) - self.assertIsInstance(CandidateGenerationError(), BotorchError) - self.assertIsInstance(InputDataError(), BotorchError) - self.assertIsInstance(UnsupportedError(), BotorchError) - self.assertIsInstance(BotorchTensorDimensionError(), BotorchError) + for ErrorClass in [ + CandidateGenerationError, + DeprecationError, + InputDataError, + UnsupportedError, + BotorchTensorDimensionError, + ]: + self.assertIsInstance(ErrorClass(), BotorchError) def test_raise_botorch_exceptions(self): for ErrorClass in (