Skip to content

parquet multiprocessing

map_parquet_dataset(
    dataset_path,
    destination_path,
    function,
    columns=None,
    progress_bar=None,
)

Apply a function over parquet dataset in a multiprocessing environment.

Will save results in multiple files in a destination path.

PARAMETER DESCRIPTION
dataset_path

Path of the parquet dataset.

TYPE: Path

destination_path

Path of the destination.

TYPE: Path

function

Function to apply over a row group table. Will save resulting table in a new parquet file.

TYPE: Callable[[Table], Table]

columns

List of columns to read. Defaults to None.

TYPE: Optional[list[str]] DEFAULT: None

progress_bar

Progress bar to show task status. Defaults to None.

TYPE: Optional[TaskProgressBar] DEFAULT: None

Source code in quackosm/_parquet_multiprocessing.py
def map_parquet_dataset(
    dataset_path: Path,
    destination_path: Path,
    function: Callable[[pa.Table], pa.Table],
    columns: Optional[list[str]] = None,
    progress_bar: Optional[TaskProgressBar] = None,
) -> None:
    """
    Apply a function over parquet dataset in a multiprocessing environment.

    Will save results in multiple files in a destination path.

    Args:
        dataset_path (Path): Path of the parquet dataset.
        destination_path (Path): Path of the destination.
        function (Callable[[pa.Table], pa.Table]): Function to apply over a row group table.
            Will save resulting table in a new parquet file.
        columns (Optional[list[str]]): List of columns to read. Defaults to `None`.
        progress_bar (Optional[TaskProgressBar]): Progress bar to show task status.
            Defaults to `None`.
    """
    dataset = pq.ParquetDataset(dataset_path)

    tuples_to_queue = []
    for pq_file in dataset.files:
        for row_group in range(pq.ParquetFile(pq_file).num_row_groups):
            tuples_to_queue.append((pq_file, row_group))

    total = len(tuples_to_queue)
    if progress_bar:  # pragma: no cover
        progress_bar.create_manual_bar(total=total)

    queue: Queue[tuple[str, int]] = ctx.Manager().Queue()
    for queue_tuple in tuples_to_queue:
        queue.put(queue_tuple)

    destination_path.mkdir(parents=True, exist_ok=True)

    try:
        processes = [
            WorkerProcess(
                target=_job,
                args=(queue, destination_path, function, columns),
            )
            for _ in range(min(multiprocessing.cpu_count(), total))
        ]
        _run_processes(processes=processes, queue=queue, total=total, progress_bar=progress_bar)
    finally:  # pragma: no cover
        _report_exceptions(processes=processes)