Skip to content

CMA

sgptools.methods.CMA

Bases: Method

Informative sensor placement / path optimization using CMA-ES (Covariance Matrix Adaptation Evolution Strategy).

CMA-ES is a derivative-free, population-based genetic optimizer well-suited for non-convex, non-smooth objectives. Here, it searches over the flattened vector of sensing locations / waypoints.

Refer to the following paper for more details
  • Hitz et al., 2017. Adaptive Continuous-Space Informative Path Planning for Online Environmental Monitoring.

Attributes:

Name Type Description
objective Objective

Objective object to evaluate information gain.

transform Transform | None

Optional transform applied to candidate solutions (e.g., for IPP / FoV).

X_init ndarray

Flattened initial guess of the sensing locations.

pbounds Polygon

Convex hull of the X_objective points, used as an implicit geometric bound (not enforced directly by CMA-ES).

Source code in sgptools/methods.py
class CMA(Method):
    """Informative sensor placement / path optimization using CMA-ES
    (Covariance Matrix Adaptation Evolution Strategy).

    CMA-ES is a derivative-free, population-based genetic optimizer well-suited for
    non-convex, non-smooth objectives. Here, it searches over the flattened
    vector of sensing locations / waypoints.

    Refer to the following paper for more details:
        - Hitz et al., 2017. *Adaptive Continuous-Space Informative Path Planning
        for Online Environmental Monitoring.*

    Attributes:
        objective (Objective):
            Objective object to evaluate information gain.
        transform (Transform | None):
            Optional transform applied to candidate solutions (e.g., for IPP / FoV).
        X_init (np.ndarray):
            Flattened initial guess of the sensing locations.
        pbounds (shapely.geometry.Polygon):
            Convex hull of the `X_objective` points, used as an implicit geometric
            bound (not enforced directly by CMA-ES).
    """

    def __init__(self,
                 num_sensing: int,
                 X_objective: np.ndarray,
                 kernel: gpflow.kernels.Kernel,
                 noise_variance: float,
                 transform: Optional[Transform] = None,
                 num_robots: int = 1,
                 X_candidates: Optional[np.ndarray] = None,
                 num_dim: Optional[int] = None,
                 objective: Union[str, Objective] = 'SLogMI',
                 X_init: Optional[np.ndarray] = None,
                 **kwargs: Any):
        """Initialize a CMA-ES-based optimization method.

        Args:
            num_sensing (int):
                Number of sensing locations per robot.
            X_objective (np.ndarray):
                Array of shape `(n, d)` used to define the GP objective and
                to build the convex hull bounds.
            kernel (gpflow.kernels.Kernel):
                GPflow kernel used inside the objective.
            noise_variance (float):
                Observation noise variance used inside the objective.
            transform (Transform | None):
                Optional transform applied to candidate solutions before objective
                evaluation and constraints.
            num_robots (int):
                Number of robots / agents. Defaults to 1.
            X_candidates (np.ndarray | None):
                Optional discrete candidate set of locations with shape `(c, d)`.
                If provided, continuous solutions are snapped to candidates.
            num_dim (int | None):
                Dimensionality of sensing locations. If `None`, defaults to
                `X_objective.shape[-1]`, or to `X_init.shape[-1]` if `X_init`
                is provided.
            objective (str | Objective):
                Objective specification, either a string key for
                `get_objective` or a pre-instantiated `Objective`.
            X_init (np.ndarray | None):
                Initial guess for the sensing locations, with shape
                `(num_sensing * num_robots, num_dim)`. If `None`, an initial set
                is selected via `get_inducing_pts`.
            **kwargs (Any):
                Extra keyword arguments forwarded to the objective constructor
                when `objective` is a string.
        """
        super().__init__(num_sensing, X_objective, kernel, noise_variance,
                         transform, num_robots, X_candidates, num_dim)
        self.transform = transform
        if X_init is None:
            X_init = get_inducing_pts(X_objective,
                                      num_sensing * self.num_robots)
        else:
            # Override num_dim with the dimensionality of the initial solution
            self.num_dim = X_init.shape[-1]

        self.X_init: np.ndarray = X_init.reshape(-1)  # Flattened initial guess

        if isinstance(objective, str):
            self.objective = get_objective(objective)(X_objective, kernel,
                                                      noise_variance, **kwargs)
        else:
            self.objective = objective

        # Use the convex hull of the objective inputs as a geometric bound
        self.pbounds = geometry.MultiPoint([[p[0], p[1]]
                                            for p in X_objective]).convex_hull

    def update(self, kernel: gpflow.kernels.Kernel,
               noise_variance: float) -> None:
        """Update the kernel and noise variance used by the objective.

        Args:
            kernel (gpflow.kernels.Kernel):
                New GPflow kernel instance.
            noise_variance (float):
                New observation noise variance.
        """
        self.objective.update(kernel, noise_variance)

    def get_hyperparameters(self) -> Tuple[gpflow.kernels.Kernel, float]:
        """Return the current kernel and noise variance used by the objective.

        Returns:
            Tuple[gpflow.kernels.Kernel, float]:
                A deep copy of the kernel and the current noise variance.
        """
        return deepcopy(self.objective.kernel), \
               self.objective.noise_variance

    def optimize(self,
                 max_steps: int = 500,
                 tol: float = 1e-6,
                 verbose: bool = False,
                 seed: Optional[int] = None,
                 restarts: int = 5,
                 **kwargs: Any) -> np.ndarray:
        """Run CMA-ES to obtain informative sensing locations.

        Args:
            max_steps (int):
                Maximum number of function evaluations (CMA-ES iterations). Defaults
                to 500.
            tol (float):
                Function-value tolerance for termination (stopping criterion
                passed to CMA). Defaults to `1e-6`.
            verbose (bool):
                If `True`, CMA-ES prints progress messages.
            seed (int | None):
                Optional random seed for reproducibility.
            restarts (int):
                Number of CMA-ES restarts allowed. Defaults to 5.
            **kwargs (Any):
                Additional keyword arguments forwarded to `cma.fmin2` (currently
                unused in this wrapper but accepted for flexibility).

        Returns:
            np.ndarray:
                Array of shape `(num_robots, num_sensing, num_dim)` containing the
                optimized sensing locations.
        """
        sigma0 = 1.0
        verbose = 1 if verbose else 0
        sol, _ = cma.fmin2(self._objective,
                           self.X_init,
                           sigma0,
                           options={
                               'maxfevals': max_steps,
                               'verb_disp': verbose,
                               'tolfun': tol,
                               'seed': seed
                           },
                           restarts=restarts)

        sol_np = np.array(sol).reshape(-1, self.num_dim)
        if self.transform is not None:
            sol_np = self.transform.expand(sol_np,
                                           expand_sensor_model=False)
            if not isinstance(sol_np, np.ndarray):
                sol_np = sol_np.numpy()

        # Snap to candidate set if provided
        if self.X_candidates is not None:
            sol_np = cont2disc(sol_np, self.X_candidates)

        sol_np = sol_np.reshape(self.num_robots, -1, self.num_dim)
        return sol_np

    def _objective(self, X: np.ndarray) -> float:
        """Objective function passed to CMA-ES (to be *minimized*).

        The internal objective (e.g., mutual information) is naturally
        maximized. CMA-ES, however, minimizes. To reconcile this, the method
        returns `-reward` (plus any constraint penalty), where `reward`
        is the value returned by the underlying objective.

        Steps:
        1. Reshape the flattened input `X` to `(num_points, num_dim)`.
        2. Optionally apply the transform (including constraints).
        3. Evaluate the GP-based objective (reward).
        4. Add the constraint penalty.
        5. Return the negative of this value as a Python float.

        Args:
            X (np.ndarray):
                Flattened array of length `num_sensing * num_robots * num_dim`
                containing the current candidate solution.

        Returns:
            float:
                Negative objective value to be minimized by CMA-ES. Large positive
                returns correspond to poor solutions; large negative returns
                correspond to good solutions.
        """
        X_reshaped = np.array(X).reshape(-1, self.num_dim)
        constraint_penality: float = 0.0
        if self.transform is not None:
            X_expanded = self.transform.expand(X_reshaped)
            constraint_penality = self.transform.constraints(X_reshaped)
            reward = self.objective(X_expanded)  # maximize
        else:
            reward = self.objective(X_reshaped)  # maximize
        if not np.isfinite(reward):
            reward = -1e6  # CMA does not handle inf values

        # Transform constraints are typically <= 0; adding them penalizes violations.
        reward += constraint_penality
        return -reward.numpy()  # CMA-ES minimizes

    def update_transform(self, transform: Transform) -> None:
        """Replace the transform used by the CMA-ES method.

        Args:
            transform (Transform):
                New `Transform` instance to use for expansion and constraints.
        """
        self.transform = transform

    def get_transform(self) -> Transform:
        """Return a deep copy of the transform used by this method.

        Returns:
            Transform:
                Deep copy of the current transform.
        """
        return deepcopy(self.transform)

__init__(num_sensing, X_objective, kernel, noise_variance, transform=None, num_robots=1, X_candidates=None, num_dim=None, objective='SLogMI', X_init=None, **kwargs)

Initialize a CMA-ES-based optimization method.

Parameters:

Name Type Description Default
num_sensing int

Number of sensing locations per robot.

required
X_objective ndarray

Array of shape (n, d) used to define the GP objective and to build the convex hull bounds.

required
kernel Kernel

GPflow kernel used inside the objective.

required
noise_variance float

Observation noise variance used inside the objective.

required
transform Transform | None

Optional transform applied to candidate solutions before objective evaluation and constraints.

None
num_robots int

Number of robots / agents. Defaults to 1.

1
X_candidates ndarray | None

Optional discrete candidate set of locations with shape (c, d). If provided, continuous solutions are snapped to candidates.

None
num_dim int | None

Dimensionality of sensing locations. If None, defaults to X_objective.shape[-1], or to X_init.shape[-1] if X_init is provided.

None
objective str | Objective

Objective specification, either a string key for get_objective or a pre-instantiated Objective.

'SLogMI'
X_init ndarray | None

Initial guess for the sensing locations, with shape (num_sensing * num_robots, num_dim). If None, an initial set is selected via get_inducing_pts.

None
**kwargs Any

Extra keyword arguments forwarded to the objective constructor when objective is a string.

{}
Source code in sgptools/methods.py
def __init__(self,
             num_sensing: int,
             X_objective: np.ndarray,
             kernel: gpflow.kernels.Kernel,
             noise_variance: float,
             transform: Optional[Transform] = None,
             num_robots: int = 1,
             X_candidates: Optional[np.ndarray] = None,
             num_dim: Optional[int] = None,
             objective: Union[str, Objective] = 'SLogMI',
             X_init: Optional[np.ndarray] = None,
             **kwargs: Any):
    """Initialize a CMA-ES-based optimization method.

    Args:
        num_sensing (int):
            Number of sensing locations per robot.
        X_objective (np.ndarray):
            Array of shape `(n, d)` used to define the GP objective and
            to build the convex hull bounds.
        kernel (gpflow.kernels.Kernel):
            GPflow kernel used inside the objective.
        noise_variance (float):
            Observation noise variance used inside the objective.
        transform (Transform | None):
            Optional transform applied to candidate solutions before objective
            evaluation and constraints.
        num_robots (int):
            Number of robots / agents. Defaults to 1.
        X_candidates (np.ndarray | None):
            Optional discrete candidate set of locations with shape `(c, d)`.
            If provided, continuous solutions are snapped to candidates.
        num_dim (int | None):
            Dimensionality of sensing locations. If `None`, defaults to
            `X_objective.shape[-1]`, or to `X_init.shape[-1]` if `X_init`
            is provided.
        objective (str | Objective):
            Objective specification, either a string key for
            `get_objective` or a pre-instantiated `Objective`.
        X_init (np.ndarray | None):
            Initial guess for the sensing locations, with shape
            `(num_sensing * num_robots, num_dim)`. If `None`, an initial set
            is selected via `get_inducing_pts`.
        **kwargs (Any):
            Extra keyword arguments forwarded to the objective constructor
            when `objective` is a string.
    """
    super().__init__(num_sensing, X_objective, kernel, noise_variance,
                     transform, num_robots, X_candidates, num_dim)
    self.transform = transform
    if X_init is None:
        X_init = get_inducing_pts(X_objective,
                                  num_sensing * self.num_robots)
    else:
        # Override num_dim with the dimensionality of the initial solution
        self.num_dim = X_init.shape[-1]

    self.X_init: np.ndarray = X_init.reshape(-1)  # Flattened initial guess

    if isinstance(objective, str):
        self.objective = get_objective(objective)(X_objective, kernel,
                                                  noise_variance, **kwargs)
    else:
        self.objective = objective

    # Use the convex hull of the objective inputs as a geometric bound
    self.pbounds = geometry.MultiPoint([[p[0], p[1]]
                                        for p in X_objective]).convex_hull

get_hyperparameters()

Return the current kernel and noise variance used by the objective.

Returns:

Type Description
Tuple[Kernel, float]

Tuple[gpflow.kernels.Kernel, float]: A deep copy of the kernel and the current noise variance.

Source code in sgptools/methods.py
def get_hyperparameters(self) -> Tuple[gpflow.kernels.Kernel, float]:
    """Return the current kernel and noise variance used by the objective.

    Returns:
        Tuple[gpflow.kernels.Kernel, float]:
            A deep copy of the kernel and the current noise variance.
    """
    return deepcopy(self.objective.kernel), \
           self.objective.noise_variance

get_transform()

Return a deep copy of the transform used by this method.

Returns:

Name Type Description
Transform Transform

Deep copy of the current transform.

Source code in sgptools/methods.py
def get_transform(self) -> Transform:
    """Return a deep copy of the transform used by this method.

    Returns:
        Transform:
            Deep copy of the current transform.
    """
    return deepcopy(self.transform)

optimize(max_steps=500, tol=1e-06, verbose=False, seed=None, restarts=5, **kwargs)

Run CMA-ES to obtain informative sensing locations.

Parameters:

Name Type Description Default
max_steps int

Maximum number of function evaluations (CMA-ES iterations). Defaults to 500.

500
tol float

Function-value tolerance for termination (stopping criterion passed to CMA). Defaults to 1e-6.

1e-06
verbose bool

If True, CMA-ES prints progress messages.

False
seed int | None

Optional random seed for reproducibility.

None
restarts int

Number of CMA-ES restarts allowed. Defaults to 5.

5
**kwargs Any

Additional keyword arguments forwarded to cma.fmin2 (currently unused in this wrapper but accepted for flexibility).

{}

Returns:

Type Description
ndarray

np.ndarray: Array of shape (num_robots, num_sensing, num_dim) containing the optimized sensing locations.

Source code in sgptools/methods.py
def optimize(self,
             max_steps: int = 500,
             tol: float = 1e-6,
             verbose: bool = False,
             seed: Optional[int] = None,
             restarts: int = 5,
             **kwargs: Any) -> np.ndarray:
    """Run CMA-ES to obtain informative sensing locations.

    Args:
        max_steps (int):
            Maximum number of function evaluations (CMA-ES iterations). Defaults
            to 500.
        tol (float):
            Function-value tolerance for termination (stopping criterion
            passed to CMA). Defaults to `1e-6`.
        verbose (bool):
            If `True`, CMA-ES prints progress messages.
        seed (int | None):
            Optional random seed for reproducibility.
        restarts (int):
            Number of CMA-ES restarts allowed. Defaults to 5.
        **kwargs (Any):
            Additional keyword arguments forwarded to `cma.fmin2` (currently
            unused in this wrapper but accepted for flexibility).

    Returns:
        np.ndarray:
            Array of shape `(num_robots, num_sensing, num_dim)` containing the
            optimized sensing locations.
    """
    sigma0 = 1.0
    verbose = 1 if verbose else 0
    sol, _ = cma.fmin2(self._objective,
                       self.X_init,
                       sigma0,
                       options={
                           'maxfevals': max_steps,
                           'verb_disp': verbose,
                           'tolfun': tol,
                           'seed': seed
                       },
                       restarts=restarts)

    sol_np = np.array(sol).reshape(-1, self.num_dim)
    if self.transform is not None:
        sol_np = self.transform.expand(sol_np,
                                       expand_sensor_model=False)
        if not isinstance(sol_np, np.ndarray):
            sol_np = sol_np.numpy()

    # Snap to candidate set if provided
    if self.X_candidates is not None:
        sol_np = cont2disc(sol_np, self.X_candidates)

    sol_np = sol_np.reshape(self.num_robots, -1, self.num_dim)
    return sol_np

update(kernel, noise_variance)

Update the kernel and noise variance used by the objective.

Parameters:

Name Type Description Default
kernel Kernel

New GPflow kernel instance.

required
noise_variance float

New observation noise variance.

required
Source code in sgptools/methods.py
def update(self, kernel: gpflow.kernels.Kernel,
           noise_variance: float) -> None:
    """Update the kernel and noise variance used by the objective.

    Args:
        kernel (gpflow.kernels.Kernel):
            New GPflow kernel instance.
        noise_variance (float):
            New observation noise variance.
    """
    self.objective.update(kernel, noise_variance)

update_transform(transform)

Replace the transform used by the CMA-ES method.

Parameters:

Name Type Description Default
transform Transform

New Transform instance to use for expansion and constraints.

required
Source code in sgptools/methods.py
def update_transform(self, transform: Transform) -> None:
    """Replace the transform used by the CMA-ES method.

    Args:
        transform (Transform):
            New `Transform` instance to use for expansion and constraints.
    """
    self.transform = transform