Skip to content

PortoTaxiDataset

srai.datasets.PortoTaxiDataset

PortoTaxiDataset()

Bases: TrajectoryDataset

Porto Taxi dataset.

The dataset covers a year of trajectory data for taxis in Porto, Portugal Each ride is categorized as: A) taxi central based, B) stand-based or C) non-taxi central based. Each data point represents a completed trip initiated through the dispatch central, a taxi stand, or a random street.

Source code in srai/datasets/porto_taxi.py
def __init__(self) -> None:
    """Create the dataset."""
    numerical_columns = ["speed"]
    categorical_columns = ["call_type", "origin_call", "origin_stand", "day_type"]
    type = "trajectory"
    target = "trip_id"
    # target = None
    super().__init__(
        "kraina/porto_taxi",
        type=type,
        numerical_columns=numerical_columns,
        categorical_columns=categorical_columns,
        target=target,
    )

get_h3_with_labels

get_h3_with_labels() -> (
    tuple[
        gpd.GeoDataFrame, Optional[gpd.GeoDataFrame], Optional[gpd.GeoDataFrame]
    ]
)

Returns ids, h3 indexes sequences, with target labels from the dataset.

Points are aggregated to hex trajectories and target column values are calculated for each trajectory (time duration for TTE task, future movement sequence for HMP task).

RETURNS DESCRIPTION
tuple[GeoDataFrame, Optional[GeoDataFrame], Optional[GeoDataFrame]]

tuple[gpd.GeoDataFrame, Optional[gpd.GeoDataFrame], Optional[gpd.GeoDataFrame]]: Train, Val, Test hexes sequences with target labels in GeoDataFrames

Source code in srai/datasets/_base.py
def get_h3_with_labels(
    self,
    # resolution: Optional[int] = None,
    # target_column: Optional[str] = None,
) -> tuple[gpd.GeoDataFrame, Optional[gpd.GeoDataFrame], Optional[gpd.GeoDataFrame]]:
    """
    Returns ids, h3 indexes sequences, with target labels from the dataset.

    Points are aggregated to hex trajectories and target column values are calculated \
        for each trajectory (time duration for TTE task, future movement sequence for HMP task).

    Returns:
        tuple[gpd.GeoDataFrame, Optional[gpd.GeoDataFrame], Optional[gpd.GeoDataFrame]]: Train,\
            Val, Test hexes sequences with target labels in GeoDataFrames
    """
    # resolution = resolution if resolution is not None else self.resolution

    assert self.train_gdf is not None
    # If resolution is still None, raise an error
    if self.resolution is None:
        raise ValueError(
            "No preset resolution for the dataset in self.resolution. Please"
            "provide a resolution."
        )
    # elif self.resolution is not None and resolution != self.resolution:
    #     raise ValueError(
    #         "Resolution provided is different from the preset resolution for the"
    #         "dataset. This may result in a data leak between splits."
    #     )

    if self.version == "TTE":
        _train_gdf = self.train_gdf[[self.target, "h3_sequence", "duration"]]

        if self.test_gdf is not None:
            _test_gdf = self.test_gdf[[self.target, "h3_sequence", "duration"]]
        else:
            _test_gdf = None

        if self.val_gdf is not None:
            _val_gdf = self.val_gdf[[self.target, "h3_sequence", "duration"]]
        else:
            _val_gdf = None

    elif self.version == "HMP":
        _train_gdf = self.train_gdf[[self.target, "h3_sequence_x", "h3_sequence_y"]]

        if self.test_gdf is not None:
            _test_gdf = self.test_gdf[[self.target, "h3_sequence_x", "h3_sequence_y"]]
        else:
            _test_gdf = None

        if self.val_gdf is not None:
            _val_gdf = self.val_gdf[[self.target, "h3_sequence_x", "h3_sequence_y"]]
        else:
            _val_gdf = None

    elif self.version == "all":
        raise TypeError(
            "Could not provide target labels, as version 'all'\
        of dataset does not provide one."
        )

    return _train_gdf, _val_gdf, _test_gdf

load

load(
    version: Optional[Union[int, str]] = "TTE",
    hf_token: Optional[str] = None,
    resolution: Optional[int] = None,
) -> dict[str, gpd.GeoDataFrame]

Method to load dataset.

PARAMETER DESCRIPTION
hf_token

If needed, a User Access Token needed to authenticate to the Hugging Face Hub. Environment variable HF_TOKEN can be also used. Defaults to None.

TYPE: Optional[str] DEFAULT: None

version

version of a dataset. Available: Official train-test split for Travel Time Estimation task (TTE) and Human Mobility Prediction task (HMP). Raw data from available as: 'all'.

TYPE: Optional[str, int] DEFAULT: 'TTE'

resolution

H3 resolution for hex trajectories. Neccessary if using 'all' split.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
dict[str, GeoDataFrame]

dict[str, gpd.GeoDataFrame]: Dictionary with all splits loaded from the dataset. Will contain keys "train" and "test" if available.

Source code in srai/datasets/porto_taxi.py
def load(
    self,
    version: Optional[Union[int, str]] = "TTE",
    hf_token: Optional[str] = None,
    resolution: Optional[int] = None,
) -> dict[str, gpd.GeoDataFrame]:
    """
    Method to load dataset.

    Args:
        hf_token (Optional[str]): If needed, a User Access Token needed to authenticate to
            the Hugging Face Hub. Environment variable `HF_TOKEN` can be also used.
            Defaults to None.
        version (Optional[str, int]): version of a dataset.
            Available: Official train-test split for Travel Time Estimation task (TTE) and
            Human Mobility Prediction task (HMP). Raw data from available as: 'all'.
        resolution (Optional[int]): H3 resolution for hex trajectories.
            Neccessary if using 'all' split.

    Returns:
        dict[str, gpd.GeoDataFrame]: Dictionary with all splits loaded from the dataset. Will
            contain keys "train" and "test" if available.
    """
    if version in ("TTE", "HMP"):
        self.resolution = 9
    elif version == "all":
        self.resolution = resolution if resolution is not None else None
    else:
        raise NotImplementedError("Version not implemented")
    return super().load(hf_token=hf_token, version=version)

train_test_split

train_test_split(
    target_column: Optional[str] = None,
    resolution: Optional[int] = None,
    test_size: float = 0.2,
    n_bins: int = 4,
    random_state: Optional[int] = None,
    validation_split: bool = False,
    force_split: bool = False,
    task: Optional[str] = "TTE",
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]

Generate train/test split or train/val split from trajectory GeoDataFrame.

Train-test/train-val split is generated by splitting train_gdf.

PARAMETER DESCRIPTION
target_column

Column identifying each trajectory (contains trajectory ids).

TYPE: str DEFAULT: None

test_size

Fraction of data to be used as test set.

TYPE: float DEFAULT: 0.2

n_bins

Number of stratification bins.

TYPE: int DEFAULT: 4

random_state

Controls the shuffling applied to the data before applying the split. Pass an int for reproducible output across multiple function. Defaults to None.

TYPE: int DEFAULT: None

validation_split

If True, creates a validation split from existing train split and assigns it to self.val_gdf.

TYPE: bool DEFAULT: False

force_split

If True, forces a new split to be created, even if an existing train/test or validation split is already present. - With validation_split=False, regenerates and overwrites the test split. - With validation_split=True, regenerates and overwrites the validation split.

TYPE: bool DEFAULT: False

resolution

H3 resolution to regionalize data. Currently ignored in this subclass, different resolutions splits not supported yet. Defaults to default value from the dataset.

TYPE: int DEFAULT: None

task

Task type. Stratifies by duration (TTE) or hex length (HMP).

TYPE: Literal[TTE, HMP] DEFAULT: 'TTE'

RETURNS DESCRIPTION
tuple[GeoDataFrame, GeoDataFrame]

Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: Train/test or train/val GeoDataFrames.

Source code in srai/datasets/_base.py
def train_test_split(
    self,
    target_column: Optional[str] = None,
    resolution: Optional[int] = None,
    test_size: float = 0.2,
    n_bins: int = 4,
    random_state: Optional[int] = None,
    validation_split: bool = False,
    force_split: bool = False,
    task: Optional[str] = "TTE",
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
    """
    Generate train/test split or train/val split from trajectory GeoDataFrame.

    Train-test/train-val split is generated by splitting train_gdf.

    Args:
        target_column (str): Column identifying each trajectory (contains trajectory ids).
        test_size (float): Fraction of data to be used as test set.
        n_bins (int): Number of stratification bins.
        random_state (int, optional):  Controls the shuffling applied to the data before\
            applying the split. Pass an int for reproducible output across multiple function.\
                Defaults to None.
        validation_split (bool): If True, creates a validation split from existing train split\
            and assigns it to self.val_gdf.
        force_split: If True, forces a new split to be created, even if an existing train/test\
            or validation split is already present.
            - With `validation_split=False`, regenerates and overwrites the test split.
            - With `validation_split=True`, regenerates and overwrites the validation split.
        resolution (int, optional): H3 resolution to regionalize data. Currently ignored in\
            this subclass, different resolutions splits not supported yet.\
                Defaults to default value from the dataset.
        task (Literal["TTE", "HMP"]): Task type. Stratifies by duration
            (TTE) or hex length (HMP).


    Returns:
        Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: Train/test or train/val GeoDataFrames.
    """
    if (self.val_gdf is not None and validation_split and not force_split) or (
        self.test_gdf is not None and not validation_split and not force_split
    ):
        raise ValueError(
            "A split already exists. Use `force_split=True` to overwrite the existing "
            f"{'validation' if validation_split else 'test'} split."
        )
    assert self.train_gdf is not None
    trajectory_id_column = target_column or self.target
    gdf_copy = self.train_gdf.copy()

    if task not in {"TTE", "HMP"}:
        raise ValueError(f"Unsupported task: {task}")

    if task == "TTE":
        self.version = "TTE"
        # Calculate duration in seconds from timestamps list

        if "duration" in gdf_copy.columns:
            gdf_copy["stratify_col"] = gdf_copy["duration"]
        elif "duration" not in gdf_copy.columns and "timestamp" in gdf_copy.columns:
            gdf_copy["stratify_col"] = gdf_copy["timestamp"].apply(
                #     lambda ts: (0.0 if len(ts) < 2 else (ts[-1] - ts[0]).total_seconds())
                # )
                lambda ts: (
                    0.0 if len(ts) < 2 else pd.Timedelta(ts[-1] - ts[0]).total_seconds()
                )
            )
        else:
            raise ValueError(
                "Duration column and timestamp column does not exist.\
                              Can't stratify it."
            )

    elif task == "HMP":
        self.version = "HMP"

        def split_sequence(seq):
            split_idx = int(len(seq) * 0.85)
            if split_idx == len(seq):
                split_idx = len(seq) - 1
            return seq[:split_idx], seq[split_idx:]

        if "h3_sequence_x" not in gdf_copy.columns:
            split_result = gdf_copy["h3_sequence"].apply(split_sequence)
            gdf_copy["h3_sequence_x"] = split_result.apply(operator.itemgetter(0))
            gdf_copy["h3_sequence_y"] = split_result.apply(operator.itemgetter(1))

        # Calculate trajectory length in unique hexagons
        gdf_copy["x_len"] = gdf_copy["h3_sequence_x"].apply(lambda seq: len(set(seq)))
        gdf_copy["y_len"] = gdf_copy["h3_sequence_y"].apply(lambda seq: len(set(seq)))
        gdf_copy["stratify_col"] = gdf_copy.apply(
            lambda row: row["x_len"] + row["y_len"], axis=1
        )
    else:
        raise ValueError(f"Unsupported task type: {task}")

    gdf_copy["stratification_bin"] = pd.cut(gdf_copy["stratify_col"], bins=n_bins, labels=False)

    trajectory_indices = gdf_copy[trajectory_id_column].unique()
    duration_bins = (
        gdf_copy[[trajectory_id_column, "stratification_bin"]]
        .drop_duplicates()
        .set_index(trajectory_id_column)["stratification_bin"]
    )

    train_indices, test_indices = train_test_split(
        trajectory_indices,
        test_size=test_size,
        stratify=duration_bins.loc[trajectory_indices],
        random_state=random_state,
    )

    train_gdf = gdf_copy[gdf_copy[trajectory_id_column].isin(train_indices)]
    test_gdf = gdf_copy[gdf_copy[trajectory_id_column].isin(test_indices)]

    test_gdf = test_gdf.drop(
        columns=[
            col
            for col in (
                "x_len",
                "y_len",
                "stratification_bin",
                "stratify_col",
            )
            if col in test_gdf.columns
        ],
    )
    train_gdf = train_gdf.drop(
        columns=[
            col
            for col in (
                "x_len",
                "y_len",
                "stratification_bin",
                "stratify_col",
            )
            if col in test_gdf.columns
        ],
    )

    self.train_gdf = train_gdf
    if not validation_split:
        self.test_gdf = test_gdf
        test_len = len(self.test_gdf) if self.test_gdf is not None else 0
        print(
            f"Created new train_gdf and test_gdf. Train len: {len(self.train_gdf)}, "
            f"test len: {test_len}"
        )
    else:
        self.val_gdf = test_gdf
        val_len = len(self.val_gdf) if self.val_gdf is not None else 0
        test_len = len(self.test_gdf) if self.test_gdf is not None else 0
        print(
            f"Created new train_gdf and val_gdf. Test split remains unchanged. "
            f"Train len: {len(self.train_gdf)}, val len: {val_len}, "
            f"test len: {test_len}"
        )
    return train_gdf, test_gdf