Skip to content

Commit

Permalink
add multi-run and variable scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
Osburg committed Feb 22, 2024
1 parent 7d6af93 commit e584505
Showing 1 changed file with 148 additions and 3 deletions.
151 changes: 148 additions & 3 deletions bofire/strategies/doe/design.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
ConstraintNotFulfilledError,
NChooseKConstraint,
NonlinearConstraint,
LinearEqualityConstraint,
LinearInequalityConstraint,
NonlinearEqualityConstraint,
NonlinearInequalityConstraint,
)
from bofire.data_models.domain.api import Domain
from bofire.data_models.enum import SamplingMethodEnum
Expand All @@ -27,6 +31,147 @@
)
from bofire.strategies.enum import OptimalityCriterionEnum
from bofire.strategies.random import RandomStrategy
from re import sub


#TODO: test, document, and adapt examples
class DoE:
"""Design of Experiments class for the optimization of the design of experiments"""

def __init__(self, domain: Domain, delta: float = 1e-7, ipopt_options: Optional[Dict] = None, objective: OptimalityCriterionEnum = OptimalityCriterionEnum.D_OPTIMALITY, n_runs: int = 1, scale_variables: bool = False):
self.domain = domain
self.delta = delta
self.ipopt_options = {"maxiter": 500, "disp": 0}
if ipopt_options is not None:
self.ipopt_options = ipopt_options
self.objective = objective
self.n_runs = n_runs
self.scale_variables = scale_variables

self.scaled_domain = self.domain
self.undo_scaling = lambda x: x
if scale_variables:
self.do_scaling()

def do_scaling(self) -> None:
inputs = []
constraints = []
scaling_factors = []
offsets = []

# scale variables
for input in self.domain.inputs:
bounds = input.get_bounds()
lower = bounds[0][0]
upper = bounds[1][0]
scaling_factors.append(2. / (upper - lower))
offsets.append(lower + (upper - lower) / 2.)
inputs.append(ContinuousInput(key=input.key, bounds=(-1, 1)))

# scale constraints
for constraint in self.domain.constraints:
if isinstance(constraint, LinearEqualityConstraint):
coefficients = [c / scaling_factors[i] for i,c in enumerate(constraint.coefficients)]
rhs = constraint.rhs - sum([c * offsets[i] for i,c in enumerate(constraint.coefficients)])
constraints.append(LinearEqualityConstraint(features=constraint.features, coefficients=coefficients, rhs=rhs))
elif isinstance(constraint, LinearInequalityConstraint):
coefficients = [c / scaling_factors[i] for i,c in enumerate(constraint.coefficients)]
rhs = constraint.rhs - sum([c * offsets[i] for i,c in enumerate(constraint.coefficients)])
constraints.append(LinearInequalityConstraint(features=constraint.features, coefficients=coefficients, rhs=rhs))
elif isinstance(constraint, NonlinearEqualityConstraint):
# TODO: do replacement with negative lookahead --> only replace if the there is no word character behind it
expression = constraint.expression
features = self.domain.inputs.get_keys()
feature_replacement = [f"(({f} / {scaling_factors[i]}) + {offsets[i]})" for i,f in enumerate(features)]
for i, feature in enumerate(features):
expression = sub(r'(?<!\w)' + feature + r'(?!\w)', feature_replacement[i], expression)
constraints.append(NonlinearEqualityConstraint(expression=expression, features=features))
elif isinstance(constraint, NonlinearInequalityConstraint):
expression = constraint.expression
features = self.domain.inputs.get_keys()
feature_replacement = [f"(({f} / {scaling_factors[i]}) + {offsets[i]})" for i,f in enumerate(features)]
for i, feature in enumerate(features):
expression = sub(r'(?<!\w)' + feature + r'(?!\w)', feature_replacement[i], expression)
constraints.append(NonlinearInequalityConstraint(expression=expression, features=features))
else:
constraints.append(constraint)

self.scaled_domain = Domain(inputs=inputs, outputs=self.domain.outputs, constraints=constraints)
self.undo_scaling = lambda x: x / scaling_factors + offsets


def initialize(
self,
model_type: Union[str, Formula],
n_experiments: Optional[int] = None,
sampling: Optional[pd.DataFrame] = None,
fixed_experiments: Optional[pd.DataFrame] = None,
partially_fixed_experiments: Optional[pd.DataFrame] = None,
) -> pd.DataFrame:
"""Function computing an optimal design for a given domain and model.
Args:
model_type (str, Formula): keyword or formulaic Formula describing the model. Known keywords
are "linear", "linear-and-interactions", "linear-and-quadratic", "fully-quadratic".
n_experiments (int): Number of experiments. By default the value corresponds to
the number of model terms - dimension of ker() + 3.
sampling (pd.DataFrame): dataframe containing the initial guess.
fixed_experiments (pd.DataFrame): dataframe containing experiments that will be definitely part of the design.
Values are set before the optimization.
partially_fixed_experiments (pd.DataFrame): dataframe containing (some) fixed variables for experiments.
Values are set before the optimization. Within one experiment not all variables need to be fixed.
Variables can be fixed to one value or can be set to a range by setting a tuple with lower and upper bound
Non-fixed variables have to be set to None or nan.
Returns:
A pd.DataFrame object containing the best found input for the experiments. In general, this is only a
local optimum.
"""

domain = self.domain
if self.scale_variables:
domain = self.scaled_domain

# scale the sampling
if sampling is not None:
pass

# scale the fixed experiments
if fixed_experiments is not None:
pass

# scale the partially fixed experiments
if partially_fixed_experiments is not None:
pass


# define the objective function for evaluation of the designs
objective_class = get_objective_class(self.objective)
objective = objective_class(
domain=domain,
model=get_formula_from_string(model_type=model_type, rhs_only=True, domain=domain),
n_experiments=n_experiments,
delta=self.delta
)

best_design = None
for i in range(self.n_runs):
design = find_local_max_ipopt(
domain=domain,
model_type=model_type,
n_experiments=n_experiments,
delta=self.delta,
ipopt_options=self.ipopt_options,
sampling=sampling,
fixed_experiments=fixed_experiments,
partially_fixed_experiments=partially_fixed_experiments,
objective=self.objective)
if (best_design is None):
best_design = design
elif (objective.evaluate(best_design.to_numpy().flatten()) > objective.evaluate(design.to_numpy().flatten())):
best_design = design

design = self.undo_scaling(design)

return design


def find_local_max_ipopt_BaB(
Expand Down Expand Up @@ -472,7 +617,7 @@ def find_local_max_ipopt(
)

objective_class = get_objective_class(objective)
d_optimality = objective_class(
objective = objective_class(
domain=domain, model=model_formula, n_experiments=n_experiments, delta=delta
)

Expand Down Expand Up @@ -511,13 +656,13 @@ def find_local_max_ipopt(
#

result = minimize_ipopt(
d_optimality.evaluate,
objective.evaluate,
x0=x0,
bounds=bounds,
# "SLSQP" has no deeper meaning here and just ensures correct constraint standardization
constraints=standardize_constraints(constraints, x0, "SLSQP"),
options=_ipopt_options,
jac=d_optimality.evaluate_jacobian,
jac=objective.evaluate_jacobian,
)

design = pd.DataFrame(
Expand Down

0 comments on commit e584505

Please sign in to comment.