Skip to content

Index

benchmark

The benchmark module contains classes for evaluating the performance of a model on a dataset.

BaseEvaluator

BaseEvaluator(
    task: Literal[
        "trajectory_regression",
        "regression",
        "poi_prediction",
        "mobility_prediction",
    ],
)

Bases: ABC

Abstract class for benchmark evaluators.

Source code in srai/benchmark/_base.py
def __init__(
    self,
    task: Literal[
        "trajectory_regression", "regression", "poi_prediction", "mobility_prediction"
    ],
) -> None:
    self.task = task

evaluate

abstractmethod
evaluate(
    dataset: sds.PointDataset | sds.TrajectoryDataset,
    predictions: np.ndarray,
    log_metrics: bool = True,
    hf_token: Optional[str] = None,
    **kwargs: Any
) -> dict[str, float]

Evaluate predictions againts test set.

PARAMETER DESCRIPTION
dataset

Dataset to evaluate on.

TYPE: HuggingFaceDataset

predictions

Predictions returned by your model.

TYPE: ndarray

log_metrics

If True, logs metrics to the console. Defaults to True.

TYPE: bool DEFAULT: True

hf_token

If needed, a User Access Token needed to authenticate to HF Defaults to None.

TYPE: str DEFAULT: None

**kwargs

Additional keyword arguments depending on the task.

TYPE: Any DEFAULT: {}

PARAMETER DESCRIPTION
region_ids

List of region IDs. Required for region-based evaluators.

TYPE: list[str]

point_of_interests

Points of interest. Required for point-based evaluators.

TYPE: ndarray

RETURNS DESCRIPTION
dict[str, float]

dict[str, float]: Dictionary with metrics values for the task.

Note

Specific subclasses may require different sets of keyword arguments.

Source code in srai/benchmark/_base.py
@abc.abstractmethod
def evaluate(
    self,
    dataset: sds.PointDataset | sds.TrajectoryDataset,
    predictions: np.ndarray,
    log_metrics: bool = True,
    hf_token: Optional[str] = None,
    **kwargs: Any,
) -> dict[str, float]:
    """
    Evaluate predictions againts test set.

    Args:
        dataset (sds.HuggingFaceDataset): Dataset to evaluate on.
        predictions (np.ndarray): Predictions returned by your model.
        log_metrics (bool, optional): If True, logs metrics to the console. Defaults to True.
        hf_token (str, optional): If needed, a User Access Token needed to authenticate to HF
            Defaults to None.
        **kwargs: Additional keyword arguments depending on the task.

    Keyword Args:
        region_ids (list[str], optional): List of region IDs. Required for region-based\
              evaluators.
        point_of_interests (np.ndarray, optional): Points of interest. Required for point-based\
            evaluators.

    Returns:
        dict[str, float]: Dictionary with metrics values for the task.

    Note:
        Specific subclasses may require different sets of keyword arguments.
    """
    # if self.task == "regression":
    #     train_gdf, test_gdf = dataset.load(version=f"res_{resolution}", hf_token=hf_token)
    #     target_column = dataset.target if dataset.target is not None else "count"
    #     # h3_indexes, labels = self._get_labels(test, resolution, target_column)
    #     _, h3_test = dataset.get_h3_with_labels(
    #         train_gdf=train_gdf, test_gdf=test_gdf, resolution=resolution
    #     )

    #     if h3_test is None:
    #         raise ValueError("The function 'get_h3_with_labels' returned None for h3_test.")
    #     else:
    #         h3_indexes = h3_test["region_id"].to_list()
    #         labels = h3_test[target_column].to_numpy()

    #     region_to_prediction = {
    #         region_id: prediction for region_id, prediction in zip(region_ids, predictions)
    #     }

    #     # order predictions according to the order of region_ids
    #     try:
    #         ordered_predictions = [region_to_prediction[h3] for h3 in h3_indexes]
    #     except KeyError as err:
    #         raise ValueError(
    #             "Region id for H3 index {err.args[0]} not found in region_ids."
    #         ) from err

    #     region_ids[:] = h3_indexes
    #     predictions = np.array(ordered_predictions)
    #     metrics = self._compute_metrics(predictions, labels)
    #     if log_metrics:
    #         self._log_metrics(metrics)
    #     return metrics
    # else:
    #     raise NotImplementedError
    raise NotImplementedError

HexRegressionEvaluator

HexRegressionEvaluator()

Bases: BaseEvaluator

Evaluator for regression task.

Source code in srai/benchmark/hex_regression_evaluator.py
def __init__(self) -> None:
    """Create the evaluator."""
    super().__init__(task="regression")

evaluate

evaluate(
    dataset: sds.PointDataset | sds.TrajectoryDataset,
    predictions: np.ndarray,
    log_metrics: bool = True,
    hf_token: Optional[str] = None,
    **kwargs: Any
) -> dict[str, float]

Evaluate regression predictions against test set.

This regression evaluator is designed for H3 grid predictions. Metrics are calculated for each h3 where at least one data point is present (empty regions are not taken into account).

PARAMETER DESCRIPTION
dataset

Dataset to evaluate.

TYPE: PointDataset

predictions

Predictions returned by your model. Should match regions_id.

TYPE: ndarray

log_metrics

If True, logs metrics to the console. Defaults to True.

TYPE: bool DEFAULT: True

hf_token

If needed, a User Access Token needed to authenticate to HF Defaults to None.

TYPE: str DEFAULT: None

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

PARAMETER DESCRIPTION
region_ids

List of region IDs. Required for region-based evaluators.

TYPE: list[str]

RAISES DESCRIPTION
ValueError

If region id for H3 index not found in region_ids.

RETURNS DESCRIPTION
dict[str, float]

dict[str, float]: Dictionary with metrics values for the task.

Source code in srai/benchmark/hex_regression_evaluator.py
def evaluate(
    self,
    dataset: sds.PointDataset | sds.TrajectoryDataset,
    predictions: np.ndarray,
    log_metrics: bool = True,
    hf_token: Optional[str] = None,
    **kwargs: Any,
) -> dict[str, float]:
    """
    Evaluate regression predictions against test set.

    This regression evaluator is designed for H3 grid predictions. Metrics are calculated for
    each h3 where at least one data point is present (empty regions are not taken into account).

    Args:
        dataset (sds.PointDataset): Dataset to evaluate.
        predictions (np.ndarray): Predictions returned by your model. Should match regions_id.
        log_metrics (bool, optional): If True, logs metrics to the console. Defaults to True.
        hf_token (str, optional): If needed, a User Access Token needed to authenticate to HF
            Defaults to None.
        **kwargs: Additional keyword arguments.

    Keyword Args:
        region_ids (list[str]): List of region IDs. Required for region-based evaluators.

    Raises:
        ValueError: If region id for H3 index not found in region_ids.

    Returns:
        dict[str, float]: Dictionary with metrics values for the task.
    """
    if not isinstance(dataset, sds.PointDataset):
        raise ValueError("This evaluator only supports PointDataset.")
    region_ids = kwargs.get("region_ids")

    if region_ids is None:
        raise ValueError("Region_ids are required for region-based evaluation.")

    target_column = dataset.target if dataset.target is not None else "count"
    _, _, h3_test = dataset.get_h3_with_labels()

    if h3_test is None:
        raise ValueError("The function 'get_h3_with_labels' returned None for h3_test.")
    else:
        h3_test = h3_test.reset_index()
        h3_indexes = h3_test["region_id"].to_list()
        labels = h3_test[target_column].to_numpy()

    region_to_prediction = {
        region_id: prediction for region_id, prediction in zip(region_ids, predictions)
    }

    # # order predictions according to the order of region_ids
    # try:
    #     ordered_predictions = [region_to_prediction[h3] for h3 in h3_indexes]
    # except KeyError as err:
    #     raise ValueError(
    #         "Region id for H3 index {err.args[0]} not found in region_ids."
    #     ) from err

    # region_ids[:] = h3_indexes
    # predictions = np.array(ordered_predictions)
    # metrics = self._compute_metrics(predictions, labels)
    # if log_metrics:
    #     self._log_metrics(metrics)
    # return metrics
    available_h3_indexes = [h3 for h3 in h3_indexes if h3 in region_to_prediction]

    missing_h3_indexes = set(h3_indexes) - set(available_h3_indexes)
    if missing_h3_indexes:
        logging.info(
            f"{len(missing_h3_indexes)} region_ids from given predictions have no matching h3\
                indexes in test set and will be skipped during the evaluation. Measuring for \
                      {len(available_h3_indexes)} indexes."
        )

    # Reorder labels and predictions accordingly
    filtered_labels = np.array(
        [label for h3, label in zip(h3_indexes, labels) if h3 in region_to_prediction]
    )
    ordered_predictions = np.array([region_to_prediction[h3] for h3 in available_h3_indexes])

    metrics = self._compute_metrics(ordered_predictions, filtered_labels)
    if log_metrics:
        self._log_metrics(metrics)
    return metrics

MobilityPredictionEvaluator

MobilityPredictionEvaluator(k: int = np.inf)

Bases: BaseEvaluator

Evaluator for models predicting H3 index trajectories directly.

k (int) : If set, only the first k elements of each sequence are used for metrics computation. Defaults to np.inf (use full sequences).

Source code in srai/benchmark/mobility_prediction_evaluator.py
def __init__(self, k: int = np.inf) -> None:
    """
    Create the evaluator.

    Args:
    k (int) : If set, only the first k elements of each sequence are used for metrics
             computation. Defaults to np.inf (use full sequences).
    """
    self.k = k
    super().__init__(task="mobility_prediction")

evaluate

evaluate(
    dataset: sds.PointDataset | sds.TrajectoryDataset,
    predictions: list[list[str]],
    log_metrics: bool = True,
    hf_token: Optional[str] = None,
    **kwargs: Any
) -> dict[str, float]

Evaluate predicted H3 index sequences against ground truth H3 sequences.

PARAMETER DESCRIPTION
dataset

Dataset to evaluate.

TYPE: TrajectoryDataset

predictions

Predicted sequences of H3 indexes.

TYPE: List[List[str]]

log_metrics

If True, logs metrics.

TYPE: bool DEFAULT: True

hf_token

Ignored.

TYPE: str DEFAULT: None

**kwargs
  • trip_ids (List[str]): List of trip IDs corresponding to predictions.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
dict[str, float]

dict[str, float]: Evaluation metrics.

Source code in srai/benchmark/mobility_prediction_evaluator.py
def evaluate(
    self,
    dataset: sds.PointDataset | sds.TrajectoryDataset,
    predictions: list[list[str]],
    log_metrics: bool = True,
    hf_token: Optional[str] = None,
    **kwargs: Any,
) -> dict[str, float]:
    """
    Evaluate predicted H3 index sequences against ground truth H3 sequences.

    Args:
        dataset (TrajectoryDataset): Dataset to evaluate.
        predictions (List[List[str]]): Predicted sequences of H3 indexes.
        log_metrics (bool): If True, logs metrics.
        hf_token (str, optional): Ignored.
        **kwargs:
            - trip_ids (List[str]): List of trip IDs corresponding to predictions.

    Returns:
        dict[str, float]: Evaluation metrics.
    """
    if not isinstance(dataset, sds.TrajectoryDataset):
        raise ValueError("This evaluator only supports TrajectoryDataset.")

    trip_ids = kwargs.get("trip_ids")
    if trip_ids is None:
        raise ValueError("`trip_ids` are required for trajectory evaluation.")

    _, _, h3_test = dataset.get_h3_with_labels()
    if h3_test is None:
        raise ValueError("The function 'get_h3_with_labels' returned None for h3_test.")

    trip_id_col = dataset.target if dataset.target is not None else "trip_id"
    h3_col = "h3_sequence_y"  # Adjust if this column name differs

    # Map predictions to their corresponding trip ID
    trip_to_prediction = {
        int(trip_id): prediction for trip_id, prediction in zip(trip_ids, predictions)
    }
    trip_to_prediction_keys = trip_to_prediction.keys()

    all_trip_ids = set(map(int, h3_test[trip_id_col].unique()))
    available_trip_ids = set(trip_to_prediction_keys).intersection(all_trip_ids)
    missing_trip_ids = set(trip_to_prediction_keys).difference(available_trip_ids)

    if missing_trip_ids:
        logging.info(
            f"{len(missing_trip_ids)} trip_ids have no matching data in the test set "
            f"and will be skipped. Evaluating {len(available_trip_ids)} trip(s)."
        )

    if not available_trip_ids:
        raise ValueError("No matching trip ids found in test dataset.")

    # Build filtered true sequences and predictions
    true_sequences = []
    filtered_predictions = []

    for trip_id in available_trip_ids:
        trip_df = h3_test[h3_test[trip_id_col] == trip_id]
        true_h3_seq = trip_df[h3_col].iloc[0]
        pred_h3_seq = trip_to_prediction[trip_id]

        true_sequences.append(true_h3_seq)
        filtered_predictions.append(pred_h3_seq)

    # Compute metrics
    metrics = self._compute_metrics(true_sequences, filtered_predictions, self.k)

    if log_metrics:
        self._log_metrics(metrics)

    return metrics

TrajectoryRegressionEvaluator

TrajectoryRegressionEvaluator()

Bases: BaseEvaluator

Evaluator for regression task.

Source code in srai/benchmark/trajectory_regression_evaluator.py
def __init__(self) -> None:
    """Create the evaluator."""
    super().__init__(task="trajectory_regression")

evaluate

evaluate(
    dataset: sds.PointDataset | sds.TrajectoryDataset,
    predictions: np.ndarray,
    log_metrics: bool = True,
    hf_token: Optional[str] = None,
    **kwargs: Any
) -> dict[str, float]

Evaluate regression predictions against test set.

This regression evaluator is designed for predictions for h3 grid trajectories.

PARAMETER DESCRIPTION
dataset

Dataset to evaluate.

TYPE: TrajectoryDataset

predictions

Predictions returned by your model. Should match trip_id.

TYPE: ndarray

log_metrics

If True, logs metrics to the console. Defaults to True.

TYPE: bool DEFAULT: True

hf_token

If needed, a User Access Token needed to authenticate to HF Defaults to None.

TYPE: str DEFAULT: None

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

PARAMETER DESCRIPTION
trip_ids

List of region IDs. Required for region-based evaluators.

TYPE: list[str]

RAISES DESCRIPTION
ValueError

If region id for H3 index not found in region_ids.

RETURNS DESCRIPTION
dict[str, float]

dict[str, float]: Dictionary with metrics values for the task.

Source code in srai/benchmark/trajectory_regression_evaluator.py
def evaluate(
    self,
    dataset: sds.PointDataset | sds.TrajectoryDataset,
    predictions: np.ndarray,
    log_metrics: bool = True,
    hf_token: Optional[str] = None,
    **kwargs: Any,
) -> dict[str, float]:
    """
    Evaluate regression predictions against test set.

    This regression evaluator is designed for predictions for h3 grid trajectories.

    Args:
        dataset (sds.TrajectoryDataset): Dataset to evaluate.
        predictions (np.ndarray): Predictions returned by your model. Should match trip_id.
        log_metrics (bool, optional): If True, logs metrics to the console. Defaults to True.
        hf_token (str, optional): If needed, a User Access Token needed to authenticate to HF
            Defaults to None.
        **kwargs: Additional keyword arguments.

    Keyword Args:
        trip_ids (list[str]): List of region IDs. Required for region-based evaluators.

    Raises:
        ValueError: If region id for H3 index not found in region_ids.

    Returns:
        dict[str, float]: Dictionary with metrics values for the task.
    """
    if not isinstance(dataset, sds.TrajectoryDataset):
        raise ValueError("This evaluator only supports TrajectoryDataset.")

    if dataset.version != "TTE":
        raise ValueError(
            f"Trajectory Regression Evaluator is made for regression tasks\
                        such as Travevel Time Estimation (TTE). Your dataset version is\
                          preprocessed for task: {dataset.version}"
        )
    trip_ids = kwargs.get("trip_ids")

    if trip_ids is None:
        raise ValueError("Trip_ids are required for trajectory based evaluation.")

    _, _, h3_test = dataset.get_h3_with_labels()
    # target_column = dataset.target if dataset.target is not None else "count"
    # _, h3_test = dataset.get_h3_with_labels()

    if h3_test is None:
        raise ValueError("The function 'get_h3_with_labels' returned None for h3_test.")
    else:
        trip_indexes = [int(idx) for idx in h3_test[dataset.target].to_list()]
        labels = h3_test["duration"].to_numpy()

    trip_to_prediction = {
        trip_id: prediction for trip_id, prediction in zip(trip_ids, predictions)
    }
    trip_to_prediction_keys = trip_to_prediction.keys()
    available_trip_indexes = set(trip_indexes).intersection(trip_to_prediction_keys)
    missing_trip_indexes = set(trip_indexes).difference(available_trip_indexes)

    if missing_trip_indexes:
        logging.info(
            f"{len(missing_trip_indexes)} trip_ids have no matching trip indexes in\
                     the test set and will be skipped in evaluation. Measuring for \
                      {len(available_trip_indexes)} indexes."
        )

    # Reorder labels and predictions accordingly
    if len(missing_trip_indexes) != len(trip_ids):
        filtered_labels = np.array(
            [label for idx, label in zip(trip_indexes, labels) if idx in trip_to_prediction]
        )
        ordered_predictions = np.array(
            [trip_to_prediction[idx] for idx in available_trip_indexes]
        )

        trip_ids[:] = available_trip_indexes
        predictions = ordered_predictions

        metrics = self._compute_metrics(predictions, filtered_labels)
        if log_metrics:
            self._log_metrics(metrics)
        return metrics
    else:
        raise ValueError("No matching trip ids found in test dataset")