#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
r"""Core abstractions and generic optimizers."""
from __future__ import annotations
import re
from dataclasses import dataclass, replace
from enum import auto, Enum
from itertools import count
from sys import maxsize
from time import monotonic
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from botorch.optim.closures import NdarrayOptimizationClosure
from botorch.optim.utils import get_bounds_as_ndarray
from numpy import asarray, float64 as np_float64, ndarray
from scipy.optimize import minimize
from torch import Tensor
from torch.optim.adam import Adam
from torch.optim.optimizer import Optimizer
try:
    from torch.optim.lr_scheduler import LRScheduler
except ImportError:  # pragma: no cover
    from torch.optim.lr_scheduler import _LRScheduler as LRScheduler  # pragma: no cover
_LBFGSB_MAXITER_MAXFUN_REGEX = re.compile(  # regex for maxiter and maxfun messages
    "TOTAL NO. of (ITERATIONS REACHED LIMIT|f AND g EVALUATIONS EXCEEDS LIMIT)"
)
[docs]class OptimizationStatus(int, Enum):
    RUNNING = auto()  # incomplete
    SUCCESS = auto()  # optimizer converged
    FAILURE = auto()  # terminated abnormally
    STOPPED = auto()  # stopped due to user provided criterion 
[docs]@dataclass
class OptimizationResult:
    step: int
    fval: Union[float, int]
    status: OptimizationStatus
    runtime: Optional[float] = None
    message: Optional[str] = None 
[docs]def scipy_minimize(
    closure: Union[
        Callable[[], Tuple[Tensor, Sequence[Optional[Tensor]]]],
        NdarrayOptimizationClosure,
    ],
    parameters: Dict[str, Tensor],
    bounds: Optional[Dict[str, Tuple[Optional[float], Optional[float]]]] = None,
    callback: Optional[Callable[[Dict[str, Tensor], OptimizationResult], None]] = None,
    x0: Optional[ndarray] = None,
    method: str = "L-BFGS-B",
    options: Optional[Dict[str, Any]] = None,
) -> OptimizationResult:
    r"""Generic scipy.optimize.minimize-based optimization routine.
    Args:
        closure: Callable that returns a tensor and an iterable of gradient tensors or
            NdarrayOptimizationClosure instance.
        parameters: A dictionary of tensors to be optimized.
        bounds: A dictionary mapping parameter names to lower and upper bounds.
        callback: A callable taking `parameters` and an OptimizationResult as arguments.
        x0: An optional initialization vector passed to scipy.optimize.minimize.
        method: Solver type, passed along to scipy.minimize.
        options: Dictionary of solver options, passed along to scipy.minimize.
    Returns:
        An OptimizationResult summarizing the final state of the run.
    """
    start_time = monotonic()
    wrapped_closure = (
        closure
        if isinstance(closure, NdarrayOptimizationClosure)
        else NdarrayOptimizationClosure(closure, parameters)
    )
    if bounds is None:
        bounds_np = None
    else:
        bounds_np = get_bounds_as_ndarray(parameters, bounds)
    if callback is None:
        wrapped_callback = None
    else:
        call_counter = count(1)  # callbacks are typically made at the end of each iter
        def wrapped_callback(x: ndarray):
            result = OptimizationResult(
                step=next(call_counter),
                fval=float(wrapped_closure(x)[0]),
                status=OptimizationStatus.RUNNING,
                runtime=monotonic() - start_time,
            )
            return callback(parameters, result)  # pyre-ignore [29]
    raw = minimize(
        wrapped_closure,
        wrapped_closure.state if x0 is None else x0.astype(np_float64, copy=False),
        jac=True,
        bounds=bounds_np,
        method=method,
        options=options,
        callback=wrapped_callback,
    )
    # Post-processing and outcome handling
    wrapped_closure.state = asarray(raw.x)  # set parameter state to optimal values
    msg = raw.message if isinstance(raw.message, str) else raw.message.decode("ascii")
    if raw.success:
        status = OptimizationStatus.SUCCESS
    else:
        status = (  # Check whether we stopped due to reaching maxfun or maxiter
            OptimizationStatus.STOPPED
            if _LBFGSB_MAXITER_MAXFUN_REGEX.search(msg)
            else OptimizationStatus.FAILURE
        )
    return OptimizationResult(
        fval=raw.fun,
        step=raw.nit,
        status=status,
        message=msg,
        runtime=monotonic() - start_time,
    ) 
[docs]def torch_minimize(
    closure: Callable[[], Tuple[Tensor, Sequence[Optional[Tensor]]]],
    parameters: Dict[str, Tensor],
    bounds: Optional[Dict[str, Tuple[Optional[float], Optional[float]]]] = None,
    callback: Optional[Callable[[Dict[str, Tensor], OptimizationResult], None]] = None,
    optimizer: Union[Optimizer, Callable[[List[Tensor]], Optimizer]] = Adam,
    scheduler: Optional[Union[LRScheduler, Callable[[Optimizer], LRScheduler]]] = None,
    step_limit: Optional[int] = None,
    stopping_criterion: Optional[Callable[[Tensor], bool]] = None,
) -> OptimizationResult:
    r"""Generic torch.optim-based optimization routine.
    Args:
        closure: Callable that returns a tensor and an iterable of gradient tensors.
            Responsible for setting relevant parameters' `grad` attributes.
        parameters: A dictionary of tensors to be optimized.
        bounds: An optional dictionary of bounds for elements of `parameters`.
        callback: A callable taking `parameters` and an OptimizationResult as arguments.
        step_limit: Integer specifying a maximum number of optimization steps.
            One of `step_limit` or `stopping_criterion` must be passed.
        stopping_criterion: A StoppingCriterion for the optimization loop.
        optimizer: A `torch.optim.Optimizer` instance or a factory that takes
            a list of parameters and returns an `Optimizer` instance.
        scheduler: A `torch.optim.lr_scheduler._LRScheduler` instance or a factory
            that takes a `Optimizer` instance and returns a `_LRSchedule` instance.
    Returns:
        An OptimizationResult summarizing the final state of the run.
    """
    start_time = monotonic()
    if step_limit is None:
        if stopping_criterion is None:
            raise RuntimeError("No termination conditions were given.")
        step_limit = maxsize
    if not isinstance(optimizer, Optimizer):
        optimizer = optimizer(list(parameters.values()))
    if not (scheduler is None or isinstance(scheduler, LRScheduler)):
        scheduler = scheduler(optimizer)
    _bounds = (
        {}
        if bounds is None
        else {name: limits for name, limits in bounds.items() if name in parameters}
    )
    result: OptimizationResult
    for step in range(step_limit):
        fval, _ = closure()
        result = OptimizationResult(
            step=step,
            fval=fval.detach().cpu().item(),
            status=OptimizationStatus.RUNNING,
            runtime=monotonic() - start_time,
        )
        # TODO: Update stopping_criterion API to return a message.
        if stopping_criterion and stopping_criterion(fval):
            result.status = OptimizationStatus.STOPPED
            result.message = "`torch_minimize` stopped due to `stopping_criterion`."
        if callback:
            callback(parameters, result)
        if result.status != OptimizationStatus.RUNNING:
            break
        optimizer.step()
        for name, (lower, upper) in _bounds.items():
            parameters[name].data = parameters[name].clamp(min=lower, max=upper)
        if scheduler:
            scheduler.step()
    if result.status != OptimizationStatus.RUNNING:
        return replace(result, runtime=monotonic() - start_time)
    # Account for final parameter update when stopping due to step_limit
    return OptimizationResult(
        step=step + 1,
        fval=closure()[0].detach().cpu().item(),
        status=OptimizationStatus.STOPPED,
        runtime=monotonic() - start_time,
        message=f"`torch_minimize` stopped after reaching step_limit={step_limit}.",
    )