|
| 1 | +"""Reproject a GeoArrow array |
| 2 | +""" |
| 3 | +import json |
| 4 | +import warnings |
| 5 | +from concurrent.futures import ThreadPoolExecutor |
| 6 | +from functools import lru_cache, partial |
| 7 | +from typing import Callable, Optional, Tuple, Union |
| 8 | + |
| 9 | +import numpy as np |
| 10 | +import pyarrow as pa |
| 11 | +from pyproj import CRS, Transformer |
| 12 | + |
| 13 | +from lonboard._constants import EXTENSION_NAME, OGC_84 |
| 14 | +from lonboard._geoarrow.extension_types import CoordinateDimension |
| 15 | +from lonboard._utils import get_geometry_column_index |
| 16 | + |
| 17 | +TransformerFromCRS = lru_cache(Transformer.from_crs) |
| 18 | + |
| 19 | + |
| 20 | +def reproject_table( |
| 21 | + table: pa.Table, |
| 22 | + *, |
| 23 | + to_crs: Union[str, CRS] = OGC_84, |
| 24 | + max_workers: Optional[int] = None, |
| 25 | +) -> pa.Table: |
| 26 | + """Reproject a GeoArrow table to a new CRS |
| 27 | +
|
| 28 | + Args: |
| 29 | + table: The table to reproject. |
| 30 | + to_crs: The target CRS. Defaults to OGC_84. |
| 31 | + max_workers: The maximum number of threads to use. Defaults to None. |
| 32 | +
|
| 33 | + Returns: |
| 34 | + A new table. |
| 35 | + """ |
| 36 | + geom_col_idx = get_geometry_column_index(table.schema) |
| 37 | + # No geometry column in table |
| 38 | + if geom_col_idx is None: |
| 39 | + return table |
| 40 | + |
| 41 | + geom_field = table.schema.field(geom_col_idx) |
| 42 | + geom_column = table.column(geom_col_idx) |
| 43 | + |
| 44 | + # geometry column exists in table but is not assigned a CRS |
| 45 | + if b"ARROW:extension:metadata" not in geom_field.metadata: |
| 46 | + return table |
| 47 | + |
| 48 | + new_field, new_column = reproject_column( |
| 49 | + field=geom_field, column=geom_column, to_crs=to_crs, max_workers=max_workers |
| 50 | + ) |
| 51 | + return table.set_column(geom_col_idx, new_field, new_column) |
| 52 | + |
| 53 | + |
| 54 | +def reproject_column( |
| 55 | + *, |
| 56 | + field: pa.Field, |
| 57 | + column: pa.ChunkedArray, |
| 58 | + to_crs: Union[str, CRS] = OGC_84, |
| 59 | + max_workers: Optional[int] = None, |
| 60 | +) -> Tuple[pa.Field, pa.ChunkedArray]: |
| 61 | + """Reproject a GeoArrow array to a new CRS |
| 62 | +
|
| 63 | + Args: |
| 64 | + field: The field describing the column |
| 65 | + column: A ChunkedArray |
| 66 | + to_crs: The target CRS. Defaults to OGC_84. |
| 67 | + max_workers: The maximum number of threads to use. Defaults to None. |
| 68 | + """ |
| 69 | + extension_type_name = field.metadata[b"ARROW:extension:name"] |
| 70 | + extension_metadata = json.loads(field.metadata[b"ARROW:extension:metadata"]) |
| 71 | + crs_str = extension_metadata["crs"] |
| 72 | + existing_crs = CRS(crs_str) |
| 73 | + |
| 74 | + if existing_crs == to_crs: |
| 75 | + return field, column |
| 76 | + |
| 77 | + # NOTE: Not sure the best place to put this warning |
| 78 | + warnings.warn("Input being reprojected to EPSG:4326 CRS") |
| 79 | + |
| 80 | + transformer = TransformerFromCRS(existing_crs, to_crs, always_xy=True) |
| 81 | + |
| 82 | + # Metadata inside metadata, bad naming |
| 83 | + new_extension_meta_meta = {"crs": CRS(to_crs).to_json()} |
| 84 | + new_extension_metadata = { |
| 85 | + b"ARROW:extension:name": extension_type_name, |
| 86 | + b"ARROW:extension:metadata": json.dumps(new_extension_meta_meta), |
| 87 | + } |
| 88 | + |
| 89 | + new_chunked_array = _reproject_column( |
| 90 | + column, |
| 91 | + extension_type_name=extension_type_name, |
| 92 | + transformer=transformer, |
| 93 | + max_workers=max_workers, |
| 94 | + ) |
| 95 | + return field.with_metadata(new_extension_metadata), new_chunked_array |
| 96 | + |
| 97 | + |
| 98 | +def _reproject_column( |
| 99 | + column: pa.ChunkedArray, |
| 100 | + *, |
| 101 | + extension_type_name: EXTENSION_NAME, |
| 102 | + transformer: Transformer, |
| 103 | + max_workers: Optional[int] = None, |
| 104 | +) -> pa.ChunkedArray: |
| 105 | + if extension_type_name == EXTENSION_NAME.POINT: |
| 106 | + func = partial(_reproject_chunk_nest_0, transformer=transformer) |
| 107 | + elif extension_type_name in [EXTENSION_NAME.LINESTRING, EXTENSION_NAME.MULTIPOINT]: |
| 108 | + func = partial(_reproject_chunk_nest_1, transformer=transformer) |
| 109 | + elif extension_type_name in [ |
| 110 | + EXTENSION_NAME.POLYGON, |
| 111 | + EXTENSION_NAME.MULTILINESTRING, |
| 112 | + ]: |
| 113 | + func = partial(_reproject_chunk_nest_2, transformer=transformer) |
| 114 | + |
| 115 | + elif extension_type_name == EXTENSION_NAME.MULTIPOLYGON: |
| 116 | + func = partial(_reproject_chunk_nest_3, transformer=transformer) |
| 117 | + else: |
| 118 | + raise ValueError(f"Unexpected extension type name {extension_type_name}") |
| 119 | + |
| 120 | + with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| 121 | + return pa.chunked_array(executor.map(func, column.chunks)) |
| 122 | + |
| 123 | + |
| 124 | +def _reproject_coords(arr: pa.FixedSizeListArray, transformer: Transformer): |
| 125 | + list_size = arr.type.list_size |
| 126 | + np_arr = arr.flatten().to_numpy().reshape(-1, list_size) |
| 127 | + |
| 128 | + if list_size == 2: |
| 129 | + output_np_arr = np.column_stack( |
| 130 | + transformer.transform(np_arr[:, 0], np_arr[:, 1]) |
| 131 | + ) |
| 132 | + dims = CoordinateDimension.XY |
| 133 | + elif list_size == 3: |
| 134 | + output_np_arr = np.column_stack( |
| 135 | + transformer.transform(np_arr[:, 0], np_arr[:, 1], np_arr[:, 2]) |
| 136 | + ) |
| 137 | + dims = CoordinateDimension.XYZ |
| 138 | + else: |
| 139 | + raise ValueError(f"Unexpected list size {list_size}") |
| 140 | + |
| 141 | + coord_field = pa.list_(pa.field(dims, pa.float64()), len(dims)) |
| 142 | + return pa.FixedSizeListArray.from_arrays( |
| 143 | + output_np_arr.flatten("C"), type=coord_field |
| 144 | + ) |
| 145 | + |
| 146 | + |
| 147 | +def _reproject_chunk_nest_0(arr: pa.ListArray, transformer: Transformer): |
| 148 | + callback = partial(_reproject_coords, transformer=transformer) |
| 149 | + return _map_coords_nest_0(arr, callback) |
| 150 | + |
| 151 | + |
| 152 | +def _reproject_chunk_nest_1(arr: pa.ListArray, transformer: Transformer): |
| 153 | + callback = partial(_reproject_coords, transformer=transformer) |
| 154 | + return _map_coords_nest_1(arr, callback) |
| 155 | + |
| 156 | + |
| 157 | +def _reproject_chunk_nest_2(arr: pa.ListArray, transformer: Transformer): |
| 158 | + callback = partial(_reproject_coords, transformer=transformer) |
| 159 | + return _map_coords_nest_2(arr, callback) |
| 160 | + |
| 161 | + |
| 162 | +def _reproject_chunk_nest_3(arr: pa.ListArray, transformer: Transformer): |
| 163 | + callback = partial(_reproject_coords, transformer=transformer) |
| 164 | + return _map_coords_nest_3(arr, callback) |
| 165 | + |
| 166 | + |
| 167 | +def _map_coords_nest_0( |
| 168 | + arr: pa.FixedSizeListArray, |
| 169 | + callback: Callable[[pa.FixedSizeListArray], pa.FixedSizeListArray], |
| 170 | +): |
| 171 | + new_coords = callback(arr) |
| 172 | + return new_coords |
| 173 | + |
| 174 | + |
| 175 | +def _map_coords_nest_1( |
| 176 | + arr: pa.ListArray, |
| 177 | + callback: Callable[[pa.FixedSizeListArray], pa.FixedSizeListArray], |
| 178 | +): |
| 179 | + geom_offsets = arr.offsets |
| 180 | + coords = arr.flatten() |
| 181 | + new_coords = callback(coords) |
| 182 | + new_geometry_array = pa.ListArray.from_arrays(geom_offsets, new_coords) |
| 183 | + return new_geometry_array |
| 184 | + |
| 185 | + |
| 186 | +def _map_coords_nest_2( |
| 187 | + arr: pa.ListArray, |
| 188 | + callback: Callable[[pa.FixedSizeListArray], pa.FixedSizeListArray], |
| 189 | +): |
| 190 | + geom_offsets = arr.offsets |
| 191 | + ring_offsets = arr.flatten().offsets |
| 192 | + coords = arr.flatten().flatten() |
| 193 | + new_coords = callback(coords) |
| 194 | + new_ring_array = pa.ListArray.from_arrays(ring_offsets, new_coords) |
| 195 | + new_geometry_array = pa.ListArray.from_arrays(geom_offsets, new_ring_array) |
| 196 | + return new_geometry_array |
| 197 | + |
| 198 | + |
| 199 | +def _map_coords_nest_3( |
| 200 | + arr: pa.ListArray, |
| 201 | + callback: Callable[[pa.FixedSizeListArray], pa.FixedSizeListArray], |
| 202 | +): |
| 203 | + geom_offsets = arr.offsets |
| 204 | + polygon_offsets = arr.flatten().offsets |
| 205 | + ring_offsets = arr.flatten().flatten().offsets |
| 206 | + coords = arr.flatten().flatten().flatten() |
| 207 | + new_coords = callback(coords) |
| 208 | + new_ring_array = pa.ListArray.from_arrays(ring_offsets, new_coords) |
| 209 | + new_polygon_array = pa.ListArray.from_arrays(polygon_offsets, new_ring_array) |
| 210 | + new_geometry_array = pa.ListArray.from_arrays(geom_offsets, new_polygon_array) |
| 211 | + return new_geometry_array |
0 commit comments