#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
r"""
Utilities for acquisition functions.
"""
from __future__ import annotations
from typing import Callable, List, Optional, Union
import torch
from botorch.acquisition import logei, monte_carlo
from botorch.acquisition.multi_objective import (
    logei as moo_logei,
    monte_carlo as moo_monte_carlo,
)
from botorch.acquisition.objective import MCAcquisitionObjective, PosteriorTransform
from botorch.acquisition.utils import compute_best_feasible_objective
from botorch.models.model import Model
from botorch.sampling.get_sampler import get_sampler
from botorch.utils.multi_objective.box_decompositions.non_dominated import (
    FastNondominatedPartitioning,
    NondominatedPartitioning,
)
from torch import Tensor
[docs]
def get_acquisition_function(
    acquisition_function_name: str,
    model: Model,
    objective: MCAcquisitionObjective,
    X_observed: Tensor,
    posterior_transform: Optional[PosteriorTransform] = None,
    X_pending: Optional[Tensor] = None,
    constraints: Optional[List[Callable[[Tensor], Tensor]]] = None,
    eta: Optional[Union[Tensor, float]] = 1e-3,
    mc_samples: int = 512,
    seed: Optional[int] = None,
    *,
    # optional parameters that are only needed for certain acquisition functions
    tau: float = 1e-3,
    prune_baseline: bool = True,
    marginalize_dim: Optional[int] = None,
    cache_root: bool = True,
    beta: Optional[float] = None,
    ref_point: Union[None, List[float], Tensor] = None,
    Y: Optional[Tensor] = None,
    alpha: float = 0.0,
) -> monte_carlo.MCAcquisitionFunction:
    r"""Convenience function for initializing botorch acquisition functions.
    Args:
        acquisition_function_name: Name of the acquisition function.
        model: A fitted model.
        objective: A MCAcquisitionObjective.
        X_observed: A `m1 x d`-dim Tensor of `m1` design points that have
            already been observed.
        posterior_transform: A PosteriorTransform (optional).
        X_pending: A `m2 x d`-dim Tensor of `m2` design points whose evaluation
            is pending.
        constraints: A list of callables, each mapping a Tensor of dimension
            `sample_shape x batch-shape x q x m` to a Tensor of dimension
            `sample_shape x batch-shape x q`, where negative values imply
            feasibility. Used for all acquisition functions except qSR and qUCB.
        eta: The temperature parameter for the sigmoid function used for the
            differentiable approximation of the constraints. In case of a float the
            same eta is used for every constraint in constraints. In case of a
            tensor the length of the tensor must match the number of provided
            constraints. The i-th constraint is then estimated with the i-th
            eta value. Used for all acquisition functions except qSR and qUCB.
        mc_samples: The number of samples to use for (q)MC evaluation of the
            acquisition function.
        seed: If provided, perform deterministic optimization (i.e. the
            function to optimize is fixed and not stochastic).
    Returns:
        The requested acquisition function.
    Example:
        >>> model = SingleTaskGP(train_X, train_Y)
        >>> obj = LinearMCObjective(weights=torch.tensor([1.0, 2.0]))
        >>> acqf = get_acquisition_function("qEI", model, obj, train_X)
    """
    # initialize the sampler
    sampler = get_sampler(
        posterior=model.posterior(X_observed[:1]),
        sample_shape=torch.Size([mc_samples]),
        seed=seed,
    )
    if posterior_transform is not None and acquisition_function_name in [
        "qEHVI",
        "qNEHVI",
        "qLogEHVI",
        "qLogNEHVI",
    ]:
        raise NotImplementedError(
            "PosteriorTransforms are not yet implemented for multi-objective "
            "acquisition functions."
        )
    # instantiate and return the requested acquisition function
    if acquisition_function_name in ("qEI", "qLogEI", "qPI"):
        # Since these are the non-noisy variants, use the posterior mean at the observed
        # inputs directly to compute the best feasible value without sampling.
        Y = model.posterior(X_observed, posterior_transform=posterior_transform).mean
        obj = objective(samples=Y, X=X_observed)
        best_f = compute_best_feasible_objective(
            samples=Y,
            obj=obj,
            constraints=constraints,
            model=model,
            objective=objective,
            posterior_transform=posterior_transform,
            X_baseline=X_observed,
        )
    if acquisition_function_name in ["qEI", "qLogEI"]:
        acqf_class = (
            monte_carlo.qExpectedImprovement
            if acquisition_function_name == "qEI"
            else logei.qLogExpectedImprovement
        )
        return acqf_class(
            model=model,
            best_f=best_f,
            sampler=sampler,
            objective=objective,
            posterior_transform=posterior_transform,
            X_pending=X_pending,
            constraints=constraints,
            eta=eta,
        )
    elif acquisition_function_name == "qPI":
        return monte_carlo.qProbabilityOfImprovement(
            model=model,
            best_f=best_f,
            sampler=sampler,
            objective=objective,
            posterior_transform=posterior_transform,
            X_pending=X_pending,
            tau=tau,
            constraints=constraints,
            eta=eta,
        )
    elif acquisition_function_name in ["qNEI", "qLogNEI"]:
        acqf_class = (
            monte_carlo.qNoisyExpectedImprovement
            if acquisition_function_name == "qNEI"
            else logei.qLogNoisyExpectedImprovement
        )
        return acqf_class(
            model=model,
            X_baseline=X_observed,
            sampler=sampler,
            objective=objective,
            posterior_transform=posterior_transform,
            X_pending=X_pending,
            prune_baseline=prune_baseline,
            marginalize_dim=marginalize_dim,
            cache_root=cache_root,
            constraints=constraints,
            eta=eta,
        )
    elif acquisition_function_name == "qSR":
        return monte_carlo.qSimpleRegret(
            model=model,
            sampler=sampler,
            objective=objective,
            posterior_transform=posterior_transform,
            X_pending=X_pending,
        )
    elif acquisition_function_name == "qUCB":
        if beta is None:
            raise ValueError("`beta` must be not be None for qUCB.")
        return monte_carlo.qUpperConfidenceBound(
            model=model,
            beta=beta,
            sampler=sampler,
            objective=objective,
            posterior_transform=posterior_transform,
            X_pending=X_pending,
        )
    elif acquisition_function_name in ["qEHVI", "qLogEHVI"]:
        if Y is None:
            raise ValueError(f"`Y` must not be None for {acquisition_function_name}")
        if ref_point is None:
            raise ValueError(
                f"`ref_point` must not be None for {acquisition_function_name}"
            )
        # get feasible points
        if constraints is not None:
            feas = torch.stack([c(Y) <= 0 for c in constraints], dim=-1).all(dim=-1)
            Y = Y[feas]
        obj = objective(Y)
        if alpha > 0:
            partitioning = NondominatedPartitioning(
                ref_point=torch.as_tensor(ref_point, dtype=Y.dtype, device=Y.device),
                Y=obj,
                alpha=alpha,
            )
        else:
            partitioning = FastNondominatedPartitioning(
                ref_point=torch.as_tensor(ref_point, dtype=Y.dtype, device=Y.device),
                Y=obj,
            )
        acqf_class = (
            moo_monte_carlo.qExpectedHypervolumeImprovement
            if acquisition_function_name == "qEHVI"
            else moo_logei.qLogExpectedHypervolumeImprovement
        )
        return acqf_class(
            model=model,
            ref_point=ref_point,
            partitioning=partitioning,
            sampler=sampler,
            objective=objective,
            constraints=constraints,
            eta=eta,
            X_pending=X_pending,
        )
    elif acquisition_function_name in ["qNEHVI", "qLogNEHVI"]:
        if ref_point is None:
            raise ValueError(
                f"`ref_point` must not be None for {acquisition_function_name}"
            )
        acqf_class = (
            moo_monte_carlo.qNoisyExpectedHypervolumeImprovement
            if acquisition_function_name == "qNEHVI"
            else moo_logei.qLogNoisyExpectedHypervolumeImprovement
        )
        return acqf_class(
            model=model,
            ref_point=ref_point,
            X_baseline=X_observed,
            sampler=sampler,
            objective=objective,
            constraints=constraints,
            eta=eta,
            prune_baseline=prune_baseline,
            alpha=alpha,
            X_pending=X_pending,
            marginalize_dim=marginalize_dim,
            cache_root=cache_root,
        )
    raise NotImplementedError(
        f"Unknown acquisition function {acquisition_function_name}"
    )