import torch
from typing import Optional, Callable
from torch import Tensor
from scipy.spatial.distance import cdist
from botorch.utils.transforms import normalize
from botorch.models.model_list_gp_regression import ModelListGP
from botorch.acquisition.objective import GenericMCObjective
from botorch.utils.sampling import draw_sobol_samples, sample_simplex
from botorch.utils.multi_objective.scalarization import get_chebyshev_scalarization
from botorch.sampling.normal import SobolQMCNormalSampler
from botorch.acquisition.multi_objective.parego import qLogNParEGO
from botorch.acquisition.logei import qLogNoisyExpectedImprovement
from botorch.acquisition.multi_objective.logei import qLogExpectedHypervolumeImprovement
from botorch.acquisition.multi_objective.utils import (
sample_optimal_points,
random_search_optimizer,
compute_sample_box_decomposition,
)
from botorch.acquisition.multi_objective.predictive_entropy_search import (
qMultiObjectivePredictiveEntropySearch,
)
from botorch.acquisition.multi_objective.max_value_entropy_search import (
qLowerBoundMultiObjectiveMaxValueEntropySearch,
)
from botorch.acquisition.multi_objective.joint_entropy_search import (
qLowerBoundMultiObjectiveJointEntropySearch,
)
from botorch.optim.optimize import optimize_acqf, optimize_acqf_list
from botorch.utils.multi_objective.box_decompositions import FastNondominatedPartitioning
from qpots.utils.utils import unstandardize, select_candidates
from qpots.utils.pymoo_problem import PyMooFunction, nsga2
from qpots.function import Function
from qpots.tsemo_runner import TSEMORunner
[docs]
class Acquisition:
"""
A class providing various acquisition functions and methods for multi-objective optimization.
"""
[docs]
def __init__(
self,
func: Function,
gps: ModelListGP,
cons: Optional[Callable] = None,
device: torch.device = torch.device("cpu"),
q: int = 1,
NUM_RESTARTS: int = 10,
RAW_SAMPLES: int = 512
) -> None:
"""
Initialize the multi-objective acquisition class.
This class provides various acquisition functions for multi-objective Bayesian
optimization. It supports Gaussian Process models and handles inequality constraints
if provided.
Parameters
----------
func : Function
The test function being optimized.
gps : ModelListGP
A list of Gaussian Process models used for Bayesian optimization.
cons : Optional[Callable], optional
A vector-valued function representing inequality constraints. If provided,
the acquisition function will account for feasibility constraints.
device : torch.device, optional
The computational device to use (CPU or GPU). Defaults to ``torch.device("cpu")``.
q : int, optional
The number of candidate points to sample per iteration. Defaults to 1.
NUM_RESTARTS : int, optional
The number of restarts for optimizing the acquisition function.
A higher value can improve optimization quality. Defaults to 10.
RAW_SAMPLES : int, optional
The number of raw samples used in acquisition optimization.
Higher values increase exploration but may slow computation. Defaults to 512.
"""
self.func = func
self.gps = gps
self.cons = cons
self.device = device
self.q = q
self.NUM_RESTARTS = NUM_RESTARTS
self.RAW_SAMPLES = RAW_SAMPLES
self.nobj = gps.nobj
self.ncons = gps.ncons
def _nystrom_approx(
self,
x: Tensor,
gps: ModelListGP,
m: int,
pareto_set: Optional[Tensor] = None,
col_choice: str = "pareto",
seed_iter: int = 1,
reg: float = 1e-6,
) -> Tensor:
"""
Perform Thompson sampling using the Nyström approximation for Gaussian Process (GP) models.
The Nyström approximation is a method for approximating large covariance matrices
in Gaussian Process inference, improving computational efficiency while preserving
accuracy. This function selects a subset of columns to approximate the full covariance
matrix and then performs posterior sampling.
Parameters
----------
x : Tensor
A tensor of input points for which posterior samples are computed.
gps : ModelListGP
A list of Gaussian Process models used to estimate the posterior distribution.
m : int
The number of landmarks (subset size) used in the Nyström approximation.
pareto_set : Optional[Tensor], optional
A tensor containing Pareto optimal points. Required if ``col_choice`` is 'pareto'.
Defaults to None.
col_choice : str, optional
The column selection strategy for the Nyström approximation.
Options:
- 'pareto' (default): Select columns based on proximity to Pareto optimal points.
- 'random': Select a random subset of columns.
seed_iter : int, optional
An iteration index for seeding randomness in sampling. Defaults to 1.
reg : float, optional
A small regularization constant added to the covariance matrix for numerical
stability during matrix inversion. Defaults to 1e-6.
Returns
-------
Tensor
A tensor containing posterior samples from the GP models. Infeasible points
are penalized if constraints exist.
"""
torch.manual_seed(1024 + seed_iter)
samples_list = []
for gp_model in gps.models:
posterior = gp_model.posterior(normalize(x.to(self.device), gps.bounds.to(self.device)))
mean = posterior.mean.to(self.device)
covariance = posterior.mvn.covariance_matrix.to(self.device)
if col_choice == "random":
indices = torch.randperm(covariance.shape[-1])[:m]
elif col_choice == "pareto":
if pareto_set is None:
raise ValueError("Pareto set is required for 'pareto' column choice.")
D = cdist(pareto_set.cpu(), x.cpu().numpy())
cand_indices = D.argmin(axis=-1)
indices = torch.unique(torch.tensor(cand_indices).argsort()[:m]).to(self.device)
if len(indices) < m:
extra_indices = torch.randperm(covariance.shape[-1])[:m - len(indices)].to(self.device)
indices = torch.cat((indices, extra_indices)).to(self.device)
else:
raise NotImplementedError(f"Column choice '{col_choice}' is not implemented.")
K_nm = covariance[:, indices]
apprx_covariance = covariance[indices][:, indices]
while True:
try:
L_mm_cov = torch.linalg.cholesky(
apprx_covariance + reg * torch.eye(m).to(self.device)
).to(self.device)
break
except torch.linalg.LinAlgError:
reg *= 10
z = torch.rand(m, dtype=torch.float64).to(self.device)
sample = mean + K_nm @ (L_mm_cov @ z).reshape(-1, 1)
samples_list.append(sample.detach())
Ys_ = unstandardize(torch.cat(samples_list, -1), gps.train_y.to(self.device))
if self.ncons > 0:
ind_feasible = Ys_[..., -self.ncons :] <= 0
Ys_[~ind_feasible.squeeze(), : self.nobj] = -1e12 # Arbitrary low value for infeasible points
Ys = Ys_[..., : self.nobj]
else:
Ys = Ys_
return -Ys
def _gp_posterior(self, x: Tensor, gps: ModelListGP, seed_iter: int = 1) -> Tensor:
"""
Compute posterior samples for PyMoo optimization.
Parameters
----------
x : Tensor
A tensor of input points for which posterior samples are computed.
gps : ModelListGP
A list of Gaussian Process models used to estimate the posterior distribution.
seed_iter : int, optional
An iteration index for seeding randomness in sampling. Defaults to 1.
Returns
-------
Tensor
A tensor containing posterior samples from the GP models, with
infeasible points penalized if constraints exist.
"""
torch.manual_seed(1024 + seed_iter)
Ys_ = [
model.posterior(normalize(x, gps.bounds)).sample().reshape(-1, 1) for model in gps.models
]
Ys_ = unstandardize(torch.cat(Ys_, -1), gps.train_y.to(self.device))
if self.ncons > 0:
ind_feasible = (Ys_[..., -self.ncons :] >= 0).all(dim=-1)
Ys_[~ind_feasible.squeeze(), : self.nobj] = -1e12 # Penalize infeasible points
Ys = Ys_[..., : self.nobj]
else:
Ys = Ys_
return -Ys
[docs]
def qpots(
self,
bounds: Tensor,
iteration: int,
**kwargs,
) -> Tensor:
"""
Perform Pareto Optimal Thompson Sampling (qPOTS).
Parameters
----------
bounds : Tensor
A tensor representing the lower and upper bounds for the
optimization problem.
iteration : int
The current iteration index, used for seeding randomness.
**kwargs : dict
Additional arguments for customization, including:
- ``nystrom`` (int): Whether to use the Nystrom approximation (1 for yes, 0 for no).
- ``iters`` (int): Number of iterations for approximation.
- ``nychoice`` (str): Column selection method for the Nystrom approximation.
- ``dim`` (int): Dimensionality of the problem.
- ``ngen`` (int): Number of generations for NSGA-II optimization.
- ``q`` (int): Number of candidates to select.
Returns
-------
Tensor
A tensor containing the selected candidate points after Pareto Optimal
Thompson Sampling.
"""
def track_pareto(res):
pareto_set[0] = res.opt.get("X")
pareto_set = [None]
if kwargs["nystrom"] == 1:
gp_posterior_approx_ = lambda x: self._nystrom_approx(
x.to(self.device),
self.gps,
int(0.2 * kwargs["iters"]),
normalize(torch.tensor(pareto_set[0]).to(self.device), bounds).cpu().numpy()
if pareto_set[0] is not None
else torch.zeros_like(x),
col_choice=kwargs["nychoice"],
)
pymoo_func_gp = PyMooFunction(
gp_posterior_approx_,
n_var=kwargs["dim"],
n_obj=self.nobj,
xl=bounds[0].detach().cpu().numpy(),
xu=bounds[1].detach().cpu().numpy(),
)
res = nsga2(
pymoo_func_gp,
ngen=kwargs["ngen"],
pop_size=100 * kwargs["dim"],
seed=2430,
callback=track_pareto,
)
else:
gp_posterior_ = lambda x: self.gp_posterior(
x.to(self.device), self.gps, seed_iter=iteration
)
pymoo_func_gp = PyMooFunction(
gp_posterior_,
n_var=kwargs["dim"],
n_obj=self.nobj,
xl=bounds[0].detach().cpu().numpy(),
xu=bounds[1].detach().cpu().numpy(),
)
res = nsga2(
pymoo_func_gp,
ngen=kwargs["ngen"],
pop_size=100 * kwargs["dim"],
seed=2430,
)
selected_candidates = select_candidates(
self.gps, res.X, self.device, q=kwargs["q"], seed=2043
)
return normalize(selected_candidates, bounds)
[docs]
def qlogei(self, ref_point: Tensor = torch.tensor([0.0, 0.0])) -> Tensor:
"""
Optimize the qLogEI acquisition function and return new candidate points.
Parameters
----------
ref_point : Tensor, optional
The reference point for hypervolume calculation, typically representing
a baseline for performance. Defaults to ``[0.0, 0.0]``.
Returns
-------
Tensor
A tensor containing the new candidate points selected based on
qLogEI optimization.
"""
standard_bounds = torch.zeros(2, self.func.dim, device=self.device)
standard_bounds[1] = 1
train_y = self.gps.train_y.to(self.device)
partitioning = FastNondominatedPartitioning(
ref_point=ref_point.to(self.device), Y=train_y[..., : self.nobj]
)
model = ModelListGP(*self.gps.models).to(self.device)
acq_func = qLogExpectedHypervolumeImprovement(
model=model,
ref_point=ref_point.to(self.device),
partitioning=partitioning,
constraints=[lambda Z: Z[..., -self.gps.ncons]],
)
while True:
try:
new_x, _ = optimize_acqf(
acq_function=acq_func,
bounds=standard_bounds,
q=self.q,
num_restarts=self.NUM_RESTARTS,
raw_samples=self.RAW_SAMPLES,
options={"batch_limit": 3, "maxiter": 1000},
sequential=True,
)
break
except RuntimeError:
pass
return new_x.to(self.device)
[docs]
def parego(self) -> Tensor:
"""
Perform qParEGO optimization using random weights for scalarization.
Returns
-------
Tensor
A tensor containing the new candidate points selected based on qParEGO optimization.
"""
standard_bounds = torch.tensor(
[[0.0] * self.func.dim, [1.0] * self.func.dim],
dtype=torch.double,
).to(self.device) # normalized bounds
train_x = self.gps.train_x.to(self.device)
train_y = self.gps.train_y.to(self.device)
print(f"Inside parego: train_x shape: {train_x.shape}")
sampler = SobolQMCNormalSampler(sample_shape=torch.Size([128]))
model = ModelListGP(*self.gps.models).to(self.device)
pred = unstandardize(model.posterior(train_x).mean, train_y)
acq_func_list = []
for _ in range(self.q):
weights = sample_simplex(self.func.nobj + self.gps.ncons).squeeze()
objective = GenericMCObjective(get_chebyshev_scalarization(weights=weights, Y=pred))
acq_func = qLogNoisyExpectedImprovement(
model=model,
objective=objective,
X_baseline=train_x,
sampler=sampler,
prune_baseline=True,
constraints=[lambda z: z[..., -self.gps.ncons]],
)
acq_func_list.append(acq_func)
new_x, _ = optimize_acqf_list(
acq_function_list=acq_func_list,
bounds=standard_bounds,
num_restarts=self.NUM_RESTARTS,
raw_samples=self.RAW_SAMPLES,
options={"batch_limit": 3, "maxiter": 1000},
)
return new_x.to(self.device)
[docs]
def qlogparego(self) -> Tensor:
"""
Perform qParEGO optimization using qNParEGO from BoTorch.
Returns
-------
Tensor
A tensor containing the candidate points selected based on qParEGO optimization.
"""
# Standardize bounds for optimization
standard_bounds = torch.tensor(
[[0.0] * self.func.dim, [1.0] * self.func.dim],
dtype=torch.double,
).to(self.device) # normalized bounds
train_x = self.gps.train_x.to(self.device)
sampler = SobolQMCNormalSampler(sample_shape=torch.Size([128]))
model = ModelListGP(*self.gps.models).to(self.device)
# Use qNParEGO for acquisition function
acq_func = qLogNParEGO(
model=model,
X_baseline=train_x,
sampler=sampler,
constraints=[lambda z: z[..., -self.gps.ncons]],
prune_baseline=True, # This is recommended to improve performance
)
# Optimize the acquisition function
new_x, _ = optimize_acqf(
acq_function=acq_func,
bounds=standard_bounds,
q=self.q,
num_restarts=self.NUM_RESTARTS,
raw_samples=self.RAW_SAMPLES,
options={"batch_limit": 3, "maxiter": 1000},
)
return new_x.to(self.device)
[docs]
def pesmo(self) -> Tensor:
"""
Perform Predictive Entropy Search for Multi-Objective optimization (PESMO).
Returns
-------
Tensor
A tensor containing the candidate points selected by PESMO.
"""
dim = self.func.dim
bounds = torch.row_stack([torch.zeros(dim), torch.ones(dim)]).to(self.device)
model = ModelListGP(*self.gps.models).to(self.device)
ps, _ = sample_optimal_points(
model=model,
bounds=bounds.double(),
num_samples=11,
num_points=3,
optimizer=random_search_optimizer,
optimizer_kwargs={"pop_size": 10000, "max_tries": 45},
)
acqf = qMultiObjectivePredictiveEntropySearch(model=model, pareto_sets=ps)
candidates, _ = optimize_acqf(
acq_function=acqf,
bounds=bounds.double(),
q=self.q,
num_restarts=5,
raw_samples=512,
options={"with_grad": False}
)
return candidates.to(self.device)
[docs]
def mesmo(self) -> Tensor:
"""
Perform Multi-Objective Max-Value Entropy Search (MESMO).
Returns
-------
Tensor
A tensor containing the candidate points selected by MESMO.
"""
dim = self.func.dim
bounds = torch.row_stack([torch.zeros(dim), torch.ones(dim)]).to(self.device)
model = ModelListGP(*self.gps.models).to(self.device)
ps, pf = sample_optimal_points(
model=model,
bounds=bounds.double(),
num_samples=10,
num_points=10,
optimizer=random_search_optimizer,
optimizer_kwargs={"pop_size": 10000, "max_tries": 45},
)
hypercell_bounds = compute_sample_box_decomposition(pf)
acqf = qLowerBoundMultiObjectiveMaxValueEntropySearch(
model=model,
hypercell_bounds=hypercell_bounds,
estimation_type="LB",
)
candidates, _ = optimize_acqf(
acq_function=acqf,
bounds=bounds.double(),
q=self.q,
num_restarts=8,
raw_samples=512,
sequential=True,
)
return candidates.to(self.device)
[docs]
def jesmo(self) -> Tensor:
"""
Perform Joint Entropy Search for Multi-Objective optimization (JESMO).
JESMO is an acquisition function that uses joint entropy search to efficiently
explore the Pareto frontier in multi-objective Bayesian optimization.
Returns
-------
Tensor
A tensor containing the candidate points generated by JESMO.
"""
dim = self.func.dim
bounds = torch.row_stack([torch.zeros(dim), torch.ones(dim)]).to(self.device)
model = ModelListGP(*self.gps.models).to(self.device)
ps, pf = sample_optimal_points(
model=model,
bounds=bounds.double(),
num_samples=10,
num_points=10,
optimizer=random_search_optimizer,
optimizer_kwargs={"pop_size": 10000, "max_tries": 45},
)
hypercell_bounds = compute_sample_box_decomposition(pf)
acqf = qLowerBoundMultiObjectiveJointEntropySearch(
model=model,
pareto_sets=ps,
pareto_fronts=pf,
hypercell_bounds=hypercell_bounds,
estimation_type="LB",
)
candidates, _ = optimize_acqf(
acq_function=acqf,
bounds=bounds.double(),
q=self.q,
num_restarts=8,
raw_samples=512,
sequential=True,
)
return candidates.to(self.device)
[docs]
def sobol(self) -> Tensor:
"""
Generate random Sobol sequence samples.
Returns
-------
Tensor
A tensor of randomly generated candidate points using the Sobol sequence.
"""
standard_bounds = torch.row_stack(
[torch.zeros(self.func.dim), torch.ones(self.func.dim)]
)
return draw_sobol_samples(
bounds=standard_bounds, n=1, q=self.q
).squeeze(1).to(self.device)
[docs]
def tsemo(self, save_dir: str, iters: int, ref_point: Tensor, train_shape: int, rep: int = 0):
"""
Perform Thompson Sampling Efficient Multiobjective Optimization (TS-EMO).
Parameters
----------
save_dir : str
The directory to save the TS-EMO results.
iters : int
How many iterations TS-EMO should run for.
ref_point : Tensor
The reference point for the hypervolume calculation.
train_shape : int
The shape for determining the size of bounds.
rep : int, optional
The repetition of the experiment. Defaults to 0.
Returns
-------
x : np.ndarray
The inputs of the function chosen by TS-EMO.
y : np.ndarray
The outputs of the function for each input.
times : np.ndarray
The time that each iteration takes.
hv : list
The list of the hypervolume at each iteration.
pf : Tensor
The Pareto frontier determined by TS-EMO.
"""
ts = TSEMORunner(self.func.name,
self.gps.train_x,
self.gps.train_y,
lb=[0.0]*self.gps.train_x.shape[1],
ub=[1.0]*self.gps.train_x.shape[1],
iters=iters,
batch_number=self.q)
x, y, times = ts.tsemo_run(save_dir, rep)
hv, pf = ts.tsemo_hypervolume(y, ref_point, train_shape, iters)
return x, y, times, hv, pf