Apply a function over parquet dataset in a multiprocessing environment.
Will save results in multiple files in a destination path.
PARAMETER |
DESCRIPTION |
dataset_path |
Path of the parquet dataset.
TYPE:
Path
|
destination_path |
TYPE:
Path
|
function |
Function to apply over a row group table.
Will save resulting table in a new parquet file.
TYPE:
Callable[[Table], Table]
|
columns |
List of columns to read. Defaults to None .
TYPE:
Optional[list[str]]
DEFAULT:
None
|
progress_bar |
Progress bar to show task status.
Defaults to None .
TYPE:
Optional[TaskProgressBar]
DEFAULT:
None
|
Source code in quackosm/_parquet_multiprocessing.py
| def map_parquet_dataset(
dataset_path: Path,
destination_path: Path,
function: Callable[[pa.Table], pa.Table],
columns: Optional[list[str]] = None,
progress_bar: Optional[TaskProgressBar] = None,
) -> None:
"""
Apply a function over parquet dataset in a multiprocessing environment.
Will save results in multiple files in a destination path.
Args:
dataset_path (Path): Path of the parquet dataset.
destination_path (Path): Path of the destination.
function (Callable[[pa.Table], pa.Table]): Function to apply over a row group table.
Will save resulting table in a new parquet file.
columns (Optional[list[str]]): List of columns to read. Defaults to `None`.
progress_bar (Optional[TaskProgressBar]): Progress bar to show task status.
Defaults to `None`.
"""
dataset = pq.ParquetDataset(dataset_path)
tuples_to_queue = []
for pq_file in dataset.files:
for row_group in range(pq.ParquetFile(pq_file).num_row_groups):
tuples_to_queue.append((pq_file, row_group))
total = len(tuples_to_queue)
if progress_bar: # pragma: no cover
progress_bar.create_manual_bar(total=total)
queue: Queue[tuple[str, int]] = ctx.Manager().Queue()
for queue_tuple in tuples_to_queue:
queue.put(queue_tuple)
destination_path.mkdir(parents=True, exist_ok=True)
try:
processes = [
WorkerProcess(
target=_job,
args=(queue, destination_path, function, columns),
)
for _ in range(min(multiprocessing.cpu_count(), total))
]
_run_processes(processes=processes, queue=queue, total=total, progress_bar=progress_bar)
finally: # pragma: no cover
_report_exceptions(processes=processes)
|