Skip to content

GCBCover

sgptools.methods.GCBCover

Bases: GreedyCover

GCB coverage selection with an optional path-length budget.

This class extends GreedyCover by adding a travel constraint: it greedily selects sensing locations that improve GP-coverage, but only keeps additions whose resulting path (computed by a TSP solver) stays within a user-specified distance_budget.

Algorithm summary
  • Precompute boolean coverages for all candidate/objective pairs.
  • Initialize with the single candidate covering the most objective points.
  • Iteratively propose candidates using a generalized cost/benefit score: score = (newly-covered points) / (additional distance).
  • For each proposal, re-solve a TSP over the selected points and accept the new point only if the path length is <= distance_budget.
  • Stop when target coverage is met, the sensing budget is met, or no feasible improvements remain.
Notes
  • Current implementation assumes num_robots == 1.
  • The method may return fewer than num_sensing points due to the distance budget or early attainment of the coverage target.
  • If start_nodes are provided (kwargs), they are prepended to the output route; ensure the distance budget logic accounts for them (see code notes).
Source code in sgptools/methods.py
class GCBCover(GreedyCover):
    """GCB coverage selection with an optional path-length budget.

    This class extends `GreedyCover` by adding a travel constraint: it
    greedily selects sensing locations that improve GP-coverage, but only keeps
    additions whose resulting path (computed by a TSP solver) stays within a
    user-specified `distance_budget`.

    Algorithm summary:
        - Precompute boolean coverages for all candidate/objective pairs.
        - Initialize with the single candidate covering the most objective points.
        - Iteratively propose candidates using a generalized cost/benefit score:
             score = (newly-covered points) / (additional distance).
        - For each proposal, re-solve a TSP over the selected points and accept
           the new point only if the path length is <= `distance_budget`.
        - Stop when target coverage is met, the sensing budget is met, or no
           feasible improvements remain.

    Notes:
        - Current implementation assumes `num_robots == 1`.
        - The method may return fewer than `num_sensing` points due to the distance
          budget or early attainment of the coverage target.
        - If `start_nodes` are provided (kwargs), they are prepended to the output
          route; ensure the distance budget logic accounts for them (see code notes).
    """

    def optimize(self,
                 post_var_threshold: float = 0.7,
                 target_fraction: int = 100,
                 distance_budget: float = float("inf"),
                 return_fovs: bool = False,
                 slack_ratio: float = None,
                 candidate_method: str = 'Hex',
                 X_warm_start: np.ndarray = [],
                 **kwargs) -> np.ndarray:
        """Run the GCB selection with a path-length constraint.

        Args:
            post_var_threshold (float):
                Posterior variance upper bound used to binarize coverage (same meaning
                as in :meth:`GreedyCover.optimize`).
            target_fraction (int):
                Desired percent coverage in [0, 100]. The method stops once the number
                of covered objective points reaches `ceil(n_obj * target_fraction/100)`.
            distance_budget (float):
                Maximum allowed path length for the selected sensing locations (after
                TSP re-ordering). Use `inf` to disable the budget.
            return_fovs (bool):
                If True, also return polygon FoVs derived from covered objective points.
            slack_ratio (float | None):
                Non-negative slack used to lower the post_var_threshold when generating 
                the candidate set, generating extra candidates and improving the chance of 
                reaching the target coverage.
            candidate_method (str): 
                Method used to generate the candidate set. Available options: `Hex` and `Grid`.
            X_warm_start (np.ndarray):
                Initial candidate locations to force inclusion.
            **kwargs (Any):
                Extra arguments forwarded to the TSP solver.

        Returns:
            np.ndarray | Tuple[np.ndarray, List[shapely.geometry.Polygon]]:
                X_sol:
                    Array shaped (num_robots, k, d) with k <= num_sensing selected points.
                (X_sol, fovs):
                    If return_fovs is True, also returns a list of shapely Polygons.
        """
        if not hasattr(self, "coverages"):
            # Increase slack ratio until we can reach the target fraction

            if slack_ratio is not None:
                pass
            elif candidate_method=='Hex':
                slack_ratio = 0.5
            elif candidate_method=='Grid':
                slack_ratio = 0.9

            max_fraction = float("-inf")
            while max_fraction < target_fraction:
                max_fraction = self._compute_coverage_maps(
                    post_var_threshold,
                    target_fraction,
                    slack_ratio,
                    candidate_method,
                    X_warm_start,
                    **kwargs,
                )

                if max_fraction >= target_fraction or max_fraction < 0.:
                    break                    

                print("Failed to achieve target coverage. Retrying with increased slack ratio...")
                slack_ratio -= 0.1
                if slack_ratio < 0.:
                    break

            if max_fraction < target_fraction or slack_ratio < 0.:
                raise ValueError(
                    f"Target coverage: {target_fraction:.2f}% is not achievable; "
                    f"maximum possible: {max_fraction:.2f}% with "
                    f"post_var_threshold: {post_var_threshold:.2f} and "
                    f"Final slack_ratio: {slack_ratio:.2f}."
                )

        if kwargs.get('start_nodes', None) is not None:
            offset = 1
        else:
            offset = 0

        ws = len(X_warm_start)
        selected_idxs = []
        current_cover = np.zeros(self.X_objective.shape[0], 
                                    dtype=bool)
        remaining = np.arange(len(self.X_candidates), dtype=np.int64)

        # Add warm start locations if available
        if ws:
            selected_idxs = list(range(ws))
            current_cover = np.any(self.coverages[selected_idxs], axis=0)
            current_sum = int(current_cover.sum())
            remaining = np.arange(ws, len(self.X_candidates),
                                  dtype=np.int64)                                 

        # ----- Initial location: best single coverage -----
        single_areas = self.coverages[ws:].sum(axis=1)
        first_idx = ws + int(np.argmax(single_areas))
        selected_idxs.append(first_idx)
        distance = 0.0
        current_cover |= self.coverages[first_idx]
        current_sum = int(current_cover.sum())
        remaining = remaining[remaining != first_idx]

        # ----- GCB loop -----
        while (remaining.size > 0 and
               current_sum < self.target_sum and
               len(selected_idxs) < self.num_sensing):
            # Make copies for delta method
            remaining_local = (remaining - ws).astype(np.int64)
            selected_local = np.array(selected_idxs[ws:], dtype=np.int64) - ws

            # Compute deltas
            distance_deltas, area_deltas = _compute_deltas_all_numba(
                remaining_local, 
                selected_local, 
                self.X_candidates[ws:], 
                self.coverages[ws:],
                current_cover, 
                distance
            )

            # area gain per extra distance
            safe_dist = np.where(distance_deltas <= 0.0, 1e-9, distance_deltas)
            ratios = area_deltas / safe_dist

            # inner loop: find a feasible candidate under distance_budget
            while remaining.size > 0:
                pos = int(np.argmax(ratios))          # position in remaining/ratios
                best_idx = int(remaining[pos])        # GLOBAL candidate index
                best_ratio = ratios[pos]

                # Remove from pools by POSITION (aligned arrays)
                remaining = np.delete(remaining, pos)
                ratios = np.delete(ratios, pos)
                distance_deltas = np.delete(distance_deltas, pos)
                area_deltas = np.delete(area_deltas, pos)

                if best_ratio <= 0:
                    # No positive area/distance candidates left
                    remaining = np.array([], dtype=np.int64)
                    break

                # Recompute TSP path including this candidate
                idx_list = selected_idxs[ws:] + [best_idx]
                locs = self.X_candidates[idx_list]

                # run_tsp must be available in the module scope
                _, dist_list, indices_list = run_tsp(
                    locs,
                    initial_route=[list(range(1, len(locs) + 1))],
                    return_indices=True,
                    **kwargs
                )

                new_distance = dist_list[0]
                order = indices_list[0][offset:] - offset
                new_selected_idxs = [idx_list[i] for i in order]  # GLOBAL indices

                if new_distance <= distance_budget:
                    selected_idxs = selected_idxs[:ws] + new_selected_idxs
                    distance = new_distance

                    # Recompute coverage and area with new selection
                    current_cover |= self.coverages[best_idx]
                    current_sum = int(current_cover.sum())
                    break  # back to outer while

        # ----- Prepare outputs -----
        X_sol = self.X_candidates[selected_idxs[ws:]]
        start_nodes = kwargs.get('start_nodes', None)
        if start_nodes is not None:
            X_sol = np.vstack([start_nodes, X_sol])
        X_sol = np.array(X_sol).reshape(self.num_robots, -1, self.num_dim)

        # Greedy solution
        X_sol_greedy = super(GCBCover, self).optimize(
                                        post_var_threshold=post_var_threshold,
                                        target_fraction=target_fraction,
                                        return_fovs=return_fovs,
                                        slack_ratio=slack_ratio,
                                        candidate_method=candidate_method,
                                        X_warm_start=X_warm_start,
                                        **kwargs)

        if return_fovs:
            fovs_greedy = X_sol_greedy[1]
            X_sol_greedy = X_sol_greedy[0]

        if get_distance(X_sol_greedy[0]) < get_distance(X_sol[0]):
            X_sol = X_sol_greedy
            if return_fovs:
                fovs = fovs_greedy
        elif return_fovs:
            fovs = self._get_fovs(self.coverages[selected_idxs[ws:]])

        if return_fovs:
            return X_sol, fovs
        else:
            return X_sol

__init__(num_sensing, X_objective, kernel, noise_variance, transform=None, num_robots=1, X_candidates=None, num_dim=None, height=None, width=None, pbounds=None, **kwargs)

Initialize the method.

Parameters:

Name Type Description Default
num_sensing int

Maximum number of sensing locations (not strictly enforced; the tiling determines the actual number of points).

required
X_objective ndarray

Environment points. Used only to infer the bounding rectangle (min/max in the first two dimensions) when height/width are not provided.

required
kernel Kernel

GP kernel (assumed to have a variance and lengthscales attribute, e.g., SquaredExponential).

required
noise_variance float

Observation noise variance.

required
transform Transform | None

Reserved for compatibility with other methods.

None
num_robots int

Must be 1. Multi-robot tilings are not supported.

1
X_candidates ndarray | None

Ignored. Present for API compatibility with other methods.

None
num_dim int | None

Dimensionality of points. Defaults to X_objective.shape[-1].

None
height float | None

Environment height in the y-direction. If None, inferred from X_objective as y_max - y_min.

None
width float | None

Environment width in the x-direction. If None, inferred from X_objective as x_max - x_min.

None
pbounds ndarray | None

Coordinates of the environment boundry polygon, used to ensure all sensing locations are inside the environment.

None
**kwargs Any

Ignored. Accepted for forward compatibility.

{}
Source code in sgptools/methods.py
def __init__(self,
             num_sensing: int,
             X_objective: np.ndarray,
             kernel: gpflow.kernels.Kernel,
             noise_variance: float,
             transform: Optional[Transform] = None,
             num_robots: int = 1,
             X_candidates: Optional[np.ndarray] = None,
             num_dim: Optional[int] = None,
             height: Optional[float] = None,
             width: Optional[float] = None,
             pbounds: Optional[np.ndarray] = None,
             **kwargs: Any):
    """Initialize the method.

    Args:
        num_sensing (int):
            Maximum number of sensing locations (not strictly enforced; the
            tiling determines the actual number of points).
        X_objective (np.ndarray):
            Environment points. Used only to infer the bounding rectangle
            (min/max in the first two dimensions) when `height`/`width` are
            not provided.
        kernel (gpflow.kernels.Kernel):
            GP kernel (assumed to have a `variance` and `lengthscales`
            attribute, e.g., SquaredExponential).
        noise_variance (float):
            Observation noise variance.
        transform (Transform | None):
            Reserved for compatibility with other methods.
        num_robots (int):
            Must be 1. Multi-robot tilings are not supported.
        X_candidates (np.ndarray | None):
            Ignored. Present for API compatibility with other methods.
        num_dim (int | None):
            Dimensionality of points. Defaults to `X_objective.shape[-1]`.
        height (float | None):
            Environment height in the y-direction. If None, inferred from
            `X_objective` as `y_max - y_min`.
        width (float | None):
            Environment width in the x-direction. If None, inferred from
            `X_objective` as `x_max - x_min`.
        pbounds (np.ndarray | None):
            Coordinates of the environment boundry polygon, used to ensure all 
            sensing locations are inside the environment.
        **kwargs (Any):
            Ignored. Accepted for forward compatibility.
    """
    super().__init__(num_sensing, X_objective, kernel, noise_variance,
                     transform, num_robots, X_candidates, num_dim)

    assert num_robots == 1, "HexCover only supports num_robots = 1."

    self.kernel = kernel
    self.noise_variance = float(noise_variance)

    # Store environment points for dtype and potential debugging
    self.X_objective = np.asarray(X_objective)

    if self.X_objective.ndim != 2 or self.X_objective.shape[1] < 2:
        raise ValueError(
            "HexCover requires X_objective with at least 2 spatial dimensions."
        )

    # Bounding box of the environment in (x, y) from X_objective
    mins = self.X_objective[:, :2].min(axis=0)
    maxs = self.X_objective[:, :2].max(axis=0)
    default_extent = maxs - mins

    self.origin = mins  # shift from [0, W] x [0, H] to actual coords
    self.width = float(width) if width is not None else float(default_extent[0])
    self.height = float(height) if height is not None else float(default_extent[1])

    if pbounds is not None:
        self.pbounds = geometry.Polygon(pbounds)
    else:
        self.pbounds = None

    # Extract scalar lengthscale and prior variance
    self._extract_kernel_scalars()

get_hyperparameters()

Return current kernel and noise variance as (kernel, noise_variance).

Returns:

Type Description
Tuple[Kernel, float]

Tuple[gpflow.kernels.Kernel, float]: A tuple containing the kernel instance and noise variance.

Source code in sgptools/methods.py
def get_hyperparameters(self) -> Tuple[gpflow.kernels.Kernel, float]:
    """Return current kernel and noise variance as (kernel, noise_variance).

    Returns:
        Tuple[gpflow.kernels.Kernel, float]:
            A tuple containing the kernel instance and noise variance.
    """
    return deepcopy(self.kernel), float(self.noise_variance)

optimize(post_var_threshold=0.7, target_fraction=100, distance_budget=float('inf'), return_fovs=False, slack_ratio=None, candidate_method='Hex', X_warm_start=[], **kwargs)

Run the GCB selection with a path-length constraint.

Parameters:

Name Type Description Default
post_var_threshold float

Posterior variance upper bound used to binarize coverage (same meaning as in :meth:GreedyCover.optimize).

0.7
target_fraction int

Desired percent coverage in [0, 100]. The method stops once the number of covered objective points reaches ceil(n_obj * target_fraction/100).

100
distance_budget float

Maximum allowed path length for the selected sensing locations (after TSP re-ordering). Use inf to disable the budget.

float('inf')
return_fovs bool

If True, also return polygon FoVs derived from covered objective points.

False
slack_ratio float | None

Non-negative slack used to lower the post_var_threshold when generating the candidate set, generating extra candidates and improving the chance of reaching the target coverage.

None
candidate_method str

Method used to generate the candidate set. Available options: Hex and Grid.

'Hex'
X_warm_start ndarray

Initial candidate locations to force inclusion.

[]
**kwargs Any

Extra arguments forwarded to the TSP solver.

{}

Returns:

Type Description
ndarray

np.ndarray | Tuple[np.ndarray, List[shapely.geometry.Polygon]]: X_sol: Array shaped (num_robots, k, d) with k <= num_sensing selected points. (X_sol, fovs): If return_fovs is True, also returns a list of shapely Polygons.

Source code in sgptools/methods.py
def optimize(self,
             post_var_threshold: float = 0.7,
             target_fraction: int = 100,
             distance_budget: float = float("inf"),
             return_fovs: bool = False,
             slack_ratio: float = None,
             candidate_method: str = 'Hex',
             X_warm_start: np.ndarray = [],
             **kwargs) -> np.ndarray:
    """Run the GCB selection with a path-length constraint.

    Args:
        post_var_threshold (float):
            Posterior variance upper bound used to binarize coverage (same meaning
            as in :meth:`GreedyCover.optimize`).
        target_fraction (int):
            Desired percent coverage in [0, 100]. The method stops once the number
            of covered objective points reaches `ceil(n_obj * target_fraction/100)`.
        distance_budget (float):
            Maximum allowed path length for the selected sensing locations (after
            TSP re-ordering). Use `inf` to disable the budget.
        return_fovs (bool):
            If True, also return polygon FoVs derived from covered objective points.
        slack_ratio (float | None):
            Non-negative slack used to lower the post_var_threshold when generating 
            the candidate set, generating extra candidates and improving the chance of 
            reaching the target coverage.
        candidate_method (str): 
            Method used to generate the candidate set. Available options: `Hex` and `Grid`.
        X_warm_start (np.ndarray):
            Initial candidate locations to force inclusion.
        **kwargs (Any):
            Extra arguments forwarded to the TSP solver.

    Returns:
        np.ndarray | Tuple[np.ndarray, List[shapely.geometry.Polygon]]:
            X_sol:
                Array shaped (num_robots, k, d) with k <= num_sensing selected points.
            (X_sol, fovs):
                If return_fovs is True, also returns a list of shapely Polygons.
    """
    if not hasattr(self, "coverages"):
        # Increase slack ratio until we can reach the target fraction

        if slack_ratio is not None:
            pass
        elif candidate_method=='Hex':
            slack_ratio = 0.5
        elif candidate_method=='Grid':
            slack_ratio = 0.9

        max_fraction = float("-inf")
        while max_fraction < target_fraction:
            max_fraction = self._compute_coverage_maps(
                post_var_threshold,
                target_fraction,
                slack_ratio,
                candidate_method,
                X_warm_start,
                **kwargs,
            )

            if max_fraction >= target_fraction or max_fraction < 0.:
                break                    

            print("Failed to achieve target coverage. Retrying with increased slack ratio...")
            slack_ratio -= 0.1
            if slack_ratio < 0.:
                break

        if max_fraction < target_fraction or slack_ratio < 0.:
            raise ValueError(
                f"Target coverage: {target_fraction:.2f}% is not achievable; "
                f"maximum possible: {max_fraction:.2f}% with "
                f"post_var_threshold: {post_var_threshold:.2f} and "
                f"Final slack_ratio: {slack_ratio:.2f}."
            )

    if kwargs.get('start_nodes', None) is not None:
        offset = 1
    else:
        offset = 0

    ws = len(X_warm_start)
    selected_idxs = []
    current_cover = np.zeros(self.X_objective.shape[0], 
                                dtype=bool)
    remaining = np.arange(len(self.X_candidates), dtype=np.int64)

    # Add warm start locations if available
    if ws:
        selected_idxs = list(range(ws))
        current_cover = np.any(self.coverages[selected_idxs], axis=0)
        current_sum = int(current_cover.sum())
        remaining = np.arange(ws, len(self.X_candidates),
                              dtype=np.int64)                                 

    # ----- Initial location: best single coverage -----
    single_areas = self.coverages[ws:].sum(axis=1)
    first_idx = ws + int(np.argmax(single_areas))
    selected_idxs.append(first_idx)
    distance = 0.0
    current_cover |= self.coverages[first_idx]
    current_sum = int(current_cover.sum())
    remaining = remaining[remaining != first_idx]

    # ----- GCB loop -----
    while (remaining.size > 0 and
           current_sum < self.target_sum and
           len(selected_idxs) < self.num_sensing):
        # Make copies for delta method
        remaining_local = (remaining - ws).astype(np.int64)
        selected_local = np.array(selected_idxs[ws:], dtype=np.int64) - ws

        # Compute deltas
        distance_deltas, area_deltas = _compute_deltas_all_numba(
            remaining_local, 
            selected_local, 
            self.X_candidates[ws:], 
            self.coverages[ws:],
            current_cover, 
            distance
        )

        # area gain per extra distance
        safe_dist = np.where(distance_deltas <= 0.0, 1e-9, distance_deltas)
        ratios = area_deltas / safe_dist

        # inner loop: find a feasible candidate under distance_budget
        while remaining.size > 0:
            pos = int(np.argmax(ratios))          # position in remaining/ratios
            best_idx = int(remaining[pos])        # GLOBAL candidate index
            best_ratio = ratios[pos]

            # Remove from pools by POSITION (aligned arrays)
            remaining = np.delete(remaining, pos)
            ratios = np.delete(ratios, pos)
            distance_deltas = np.delete(distance_deltas, pos)
            area_deltas = np.delete(area_deltas, pos)

            if best_ratio <= 0:
                # No positive area/distance candidates left
                remaining = np.array([], dtype=np.int64)
                break

            # Recompute TSP path including this candidate
            idx_list = selected_idxs[ws:] + [best_idx]
            locs = self.X_candidates[idx_list]

            # run_tsp must be available in the module scope
            _, dist_list, indices_list = run_tsp(
                locs,
                initial_route=[list(range(1, len(locs) + 1))],
                return_indices=True,
                **kwargs
            )

            new_distance = dist_list[0]
            order = indices_list[0][offset:] - offset
            new_selected_idxs = [idx_list[i] for i in order]  # GLOBAL indices

            if new_distance <= distance_budget:
                selected_idxs = selected_idxs[:ws] + new_selected_idxs
                distance = new_distance

                # Recompute coverage and area with new selection
                current_cover |= self.coverages[best_idx]
                current_sum = int(current_cover.sum())
                break  # back to outer while

    # ----- Prepare outputs -----
    X_sol = self.X_candidates[selected_idxs[ws:]]
    start_nodes = kwargs.get('start_nodes', None)
    if start_nodes is not None:
        X_sol = np.vstack([start_nodes, X_sol])
    X_sol = np.array(X_sol).reshape(self.num_robots, -1, self.num_dim)

    # Greedy solution
    X_sol_greedy = super(GCBCover, self).optimize(
                                    post_var_threshold=post_var_threshold,
                                    target_fraction=target_fraction,
                                    return_fovs=return_fovs,
                                    slack_ratio=slack_ratio,
                                    candidate_method=candidate_method,
                                    X_warm_start=X_warm_start,
                                    **kwargs)

    if return_fovs:
        fovs_greedy = X_sol_greedy[1]
        X_sol_greedy = X_sol_greedy[0]

    if get_distance(X_sol_greedy[0]) < get_distance(X_sol[0]):
        X_sol = X_sol_greedy
        if return_fovs:
            fovs = fovs_greedy
    elif return_fovs:
        fovs = self._get_fovs(self.coverages[selected_idxs[ws:]])

    if return_fovs:
        return X_sol, fovs
    else:
        return X_sol

update(kernel, noise_variance)

Update kernel and noise variance hyperparameters.

Parameters:

Name Type Description Default
kernel Kernel

New GPflow kernel instance.

required
noise_variance float

New observation noise variance.

required
Source code in sgptools/methods.py
def update(self,
           kernel: gpflow.kernels.Kernel,
           noise_variance: float) -> None:
    """Update kernel and noise variance hyperparameters.

    Args:
        kernel (gpflow.kernels.Kernel):
            New GPflow kernel instance.
        noise_variance (float):
            New observation noise variance.
    """
    self.kernel = kernel
    self.noise_variance = float(noise_variance)
    self._extract_kernel_scalars()