Skip to content

Commit

Permalink
Implement TripsLayer for animating moving objects and connect to Mo…
Browse files Browse the repository at this point in the history
…vingPandas (#292)

a minimal example
![Screen Recording 2023-12-05 at 5 15
35 PM](https://github.com/developmentseed/lonboard/assets/15164633/bef41324-f838-4a25-b9ec-1a1dbb948e56)

### Change list

- Add new `TripsLayer` under the `experimental` module
- Add dev dependency on `movingpandas`
- Add `from_movingpandas` class method to construct a `TripsLayer` from
a movingpandas `TrajectoryCollection`

### Todo

- [x] Implement conversion from `movingpandas.TrajectoryCollection` to
GeoArrow.
- [x] More input validation in `TimestampAccessor`. Validate timestamps
have the same offsetting as the geometry in the main data frame. (Done
in
37a64c6)
- [x] Re-implement this example:
https://movingpandas.github.io/movingpandas-website/2-analysis-examples/ship-data.html
- [x] Store a `time_offset` integer on the `TripsLayer` that represents
the minimum value of the trip data. Note that you'd need to recompute
this when a new `get_timestamps` is assigned onto the layer. (Done in
6addb2e)
- [x] Implement custom serialization for the timestamp accessor.
Subtract off the time offset when serializing the data, and cast to
float32. (Done in
b346810)
- [x] Add timezone parameter? (we infer the timezone from the input
data)

### Open questions

- How to handle offsetted timestamps? deck.gl stores timestamps as
float32, which means there isn't enough integer precision to store
milliseconds or nanoseconds since epoch. (Timestamp precision handling
done in
ca9dcd1)
- Where to handle animation? It looks like syncing animation via an
`ipywidgets.Play` widget (connected via jslink) is probably good enough
for now, even if it appears to have a decent amount of overhead. The
alternative would be to have a manual animation component on the JS side
that maintains its own time state.

### Example repro

I got data from [Access AIS](https://marinecadastre.gov/accessais/),
with a custom bounding box and time range, though it would probably be
straightforward to use [other data
files](https://marinecadastre.gov/ais/) as well.

```py
import pyarrow as pa
import pandas as pd
import movingpandas as mpd
from lonboard import Map
from lonboard.experimental import TripsLayer
import ipywidgets

path = '/Users/kyle/Downloads/AIS_170180417406763049_2306-1701804175229.csv'
df = pd.read_csv(path)
traj_collection = mpd.TrajectoryCollection(df, 'MMSI', t='BaseDateTime', x='LON', y='LAT')

layer.width_min_pixels = 5
layer.trail_length = 100000

play = ipywidgets.Play(
    value=0,
    min=0,
    max=86399000,
    step=50_000,
    interval=50,
    repeat=True
)
play
ipywidgets.jsdlink(
    (play, 'value'),
    (layer, 'current_time'),
)
```

cc @anitagraser, you may be interested in this, and/or have ideas for
how to better integrate with movingpandas

---------

Co-authored-by: Joris Van den Bossche <[email protected]>
  • Loading branch information
kylebarron and jorisvandenbossche authored Oct 7, 2024
1 parent 3ee9297 commit 25f7e13
Show file tree
Hide file tree
Showing 24 changed files with 2,244 additions and 374 deletions.
Binary file added assets/air-traffic-control.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added assets/ais-movingpandas.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
13 changes: 13 additions & 0 deletions docs/api/layers/trips-layer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# TripsLayer

<img src="../../../assets/air-traffic-control.gif" height="300px">

> Screenshot from Air Traffic Control example.
::: lonboard.experimental.TripsLayer
options:
filters:
- "!^_"
- "!from_duckdb"
- "!from_geopandas"
inherited_members: true
1 change: 1 addition & 0 deletions docs/api/traits.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
::: lonboard.traits.NormalAccessor
::: lonboard.traits.PointAccessor
::: lonboard.traits.ArrowTableTrait
::: lonboard.experimental.traits.TimestampAccessor
194 changes: 194 additions & 0 deletions examples/air-traffic-control.ipynb

Large diffs are not rendered by default.

570 changes: 570 additions & 0 deletions examples/ais-movingpandas.ipynb

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [Speedtest data ![](../assets/scatterplot-layer-network-speeds.jpg)](../examples/internet-speeds) using [`ScatterplotLayer`](../api/layers/scatterplot-layer)
- [North America roads ![](../assets/path-layer-roads.jpg)](../examples/north-america-roads) using [`PathLayer`](../api/layers/path-layer)
- [Overture Maps buildings ![](../assets/overture.jpg)](../examples/overture-maps) using [`PolygonLayer`](../api/layers/polygon-layer)
- [Air Traffic Control animation ![](../assets/air-traffic-control.gif)](../examples/air-traffic-control) using [`TripsLayer`](../api/layers/trips-layer)
- [Global boundaries ![](../assets/boundaries.png)](../examples/global-boundaries) using [`PolygonLayer`](../api/layers/polygon-layer)
- [U.S. County-to-County Migration ![](../assets/arc-layer-migration-example.gif)](../examples/migration) using [`ArcLayer`](../api/layers/arc-layer) and [`BrushingExtension`](../api/layer-extensions/brushing-extension)
- [Scatterplot with GPU data filtering ![](../assets/data-filter-extension.gif)](../examples/data-filter-extension) using [`ScatterplotLayer`](../api/layers/scatterplot-layer) and [`DataFilterExtension`](../api/layer-extensions/data-filter-extension)
Expand All @@ -22,6 +23,8 @@
- [DuckDB Spatial ![](../assets/duckdb-heatmap.jpg)](../examples/duckdb) using [`HeatmapLayer`](../api/layers/heatmap-layer)
- [Color picker integration ![](../assets/color-picker.jpg)](../examples/integrations/color-picker) using [`SolidPolygonLayer`](../api/layers/solid-polygon-layer)
- [JupyterLab Sidecar integration ![](../assets/jupyter-sidecar.jpg)](../examples/integrations/sidecar/) using [`ScatterplotLayer`](../api/layers/scatterplot-layer) and [`JupyterLab Sidecar`](https://github.com/jupyter-widgets/jupyterlab-sidecar)
- [MovingPandas ![](../assets/ais-movingpandas.gif)](../examples/ais-movingpandas) using [`TripsLayer`](../api/layers/trips-layer)


</div>

Expand Down
2 changes: 1 addition & 1 deletion lonboard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Python library for fast, interactive geospatial vector data visualization in Jupyter.
"""

from . import colormap, controls, layer_extension, traits
from . import colormap, controls, experimental, layer_extension, traits
from ._layer import (
BaseArrowLayer,
BaseLayer,
Expand Down
8 changes: 8 additions & 0 deletions lonboard/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

import pyproj

# Minimum integer representable in a float32
# https://stackoverflow.com/a/3793950
MIN_INTEGER_FLOAT32 = -16777216

# Maximum integer representable in a float32
# https://stackoverflow.com/a/3793950
MAX_INTEGER_FLOAT32 = 16777216

EPSG_4326 = pyproj.CRS("epsg:4326")

# In pyodide, the pyproj PROJ data directory is much smaller, and it currently
Expand Down
192 changes: 192 additions & 0 deletions lonboard/_geoarrow/movingpandas_interop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Dict, List, Tuple

import numpy as np
from arro3.core import (
Array,
ChunkedArray,
DataType,
Field,
RecordBatch,
Schema,
Table,
fixed_size_list_array,
list_array,
)

if TYPE_CHECKING:
import movingpandas as mpd
import pandas as pd
import pyarrow as pa
from movingpandas import TrajectoryCollection


# TODO (lonboard-specific):
# - update timestamp serialization to cast to float32 at that point
# # offset by earliest timestamp
# timestamps -= timestamps.min()

# # Cast to float32
# timestamps = timestamps.astype(np.float32)


def movingpandas_to_geoarrow(
traj_collection: TrajectoryCollection,
) -> Tuple[Table, ChunkedArray]:
"""Convert a MovingPandas TrajectoryCollection to GeoArrow
Args:
traj_collection: _description_
Returns:
_description_
"""
import pyarrow as pa
import shapely

crs = traj_collection.get_crs()
crs_json = crs.to_json_dict() if crs is not None else None

num_coords = 0
num_trajectories = len(traj_collection)
offsets = np.zeros(num_trajectories + 1, dtype=np.int32)
datetime_dtypes = set()
attr_schemas: List[pa.Schema] = []

# Loop the first time to infer offsets for each trajectory
for i, traj in enumerate(traj_collection.trajectories):
traj: mpd.Trajectory

num_coords += traj.size()
offsets[i + 1] = num_coords
datetime_dtypes.add(traj.df.index.dtype)

geom_col_name = traj.get_geom_col()
df_attr = traj.df.drop(columns=[geom_col_name])

# Explicitly drop index because the index is a DatetimeIndex that we convert
# manually later.
arrow_schema = pa.Schema.from_pandas(df_attr, preserve_index=False)
attr_schemas.append(arrow_schema)

assert (
len(datetime_dtypes) == 1
), "Expected only one datetime dtype across all trajectories."
datetime_dtype = list(datetime_dtypes)[0]

# We currently always provision space for XYZ coordinates, and then only use 2d if
# the Z dimension is always NaN
coords = np.zeros((num_coords, 3), dtype=np.float64)

arrow_timestamp_dtype = infer_timestamp_dtype(datetime_dtype)
timestamps = np.zeros(num_coords, dtype=np.int64)

attr_schema = pa.unify_schemas(attr_schemas, promote_options="permissive")
attr_tables: List[pa.Table] = []

# Loop second time to fill timestamps and coords
for i, traj in enumerate(traj_collection.trajectories):
start_offset = offsets[i]
end_offset = offsets[i + 1]

timestamps[start_offset:end_offset] = traj.df.index
coords[start_offset:end_offset] = shapely.get_coordinates(
traj.df.geometry.array, # type: ignore
include_z=True,
)

geom_col_name = traj.get_geom_col()
df_attr = traj.df.drop(columns=[geom_col_name])

attr_table = pa.Table.from_pandas(
traj.df, schema=attr_schema, preserve_index=False
)
attr_tables.append(attr_table)

attr_table = pa.concat_tables(attr_tables, promote_options="none")
attr_table = Table.from_arrow(attr_table)

offsets = Array.from_numpy(offsets)

nested_attr_table = apply_offsets_to_table(attr_table, offsets=offsets)

if np.alltrue(np.isnan(coords[:, 2])):
coord_list_size = 2
# Cast to 2D coords
coords = coords[:, :2]
else:
assert not np.any(
np.isnan(coords[:, 2])
), "Mixed 2D and 3D coordinates not currently supported"
coord_list_size = 3

coords_arr = Array.from_numpy(coords.ravel("C"))
coords_fixed_size_list = fixed_size_list_array(coords_arr, coord_list_size)
linestrings_arr = list_array(offsets, coords_fixed_size_list)

extension_metadata: Dict[str, str] = {"ARROW:extension:name": "geoarrow.linestring"}
if crs_json is not None:
extension_metadata["ARROW:extension:metadata"] = json.dumps({"crs": crs_json})

linestrings_field = Field(
"geometry",
linestrings_arr.type,
nullable=True,
metadata=extension_metadata,
)

timestamp_values = Array.from_numpy(timestamps).cast(arrow_timestamp_dtype)
timestamp_arr = list_array(offsets, timestamp_values)
timestamp_col = ChunkedArray([timestamp_arr])

table = nested_attr_table.append_column(
linestrings_field, ChunkedArray([linestrings_arr])
)
return table, timestamp_col


def infer_timestamp_dtype(dtype: np.dtype | pd.DatetimeTZDtype) -> DataType:
"""Infer an arrow time unit from the numpy data type
Raises:
ValueError: If not a known numpy datetime dtype
"""
import pandas as pd

if isinstance(dtype, pd.DatetimeTZDtype):
return DataType.timestamp(dtype.unit, tz=str(dtype.tz))

if dtype.name == "datetime64[s]":
return DataType.timestamp("s")

if dtype.name == "datetime64[ms]":
return DataType.timestamp("ms")

if dtype.name == "datetime64[us]":
return DataType.timestamp("us")

if dtype.name == "datetime64[ns]":
return DataType.timestamp("ns")

raise ValueError(f"Unexpected datetime type: {dtype}")


def apply_offsets_to_table(table: Table, offsets: Array) -> Table:
batch = table.combine_chunks().to_batches()[0]

new_fields = []
new_arrays = []

for field_idx in range(batch.num_columns):
field = batch.schema.field(field_idx)
new_field = field.with_type(DataType.list(field))
new_array = list_array(offsets, batch[field_idx], type=new_field)

new_fields.append(new_field)
new_arrays.append(new_array)

new_schema = Schema(new_fields, metadata=batch.schema.metadata)
new_batch = RecordBatch(new_arrays, schema=new_schema)
return Table.from_batches([new_batch])
8 changes: 0 additions & 8 deletions lonboard/_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,21 +347,13 @@ def add_layer(

if isinstance(layers, Map):
new_layers = layers.layers
self.layers += layers.layers
# self.layers =x
# layers = layers.layers
elif isinstance(layers, BaseLayer):
new_layers = (layers,)
layers = [layers]
self.layers += (layers,)
else:
new_layers = tuple(layers)
self.layers += tuple(layers)

self.layers += new_layers

# self.layers += tuple(layers)

if focus:
self.view_state = compute_view(new_layers) # type: ignore

Expand Down
71 changes: 63 additions & 8 deletions lonboard/_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,28 @@

import math
from io import BytesIO
from typing import TYPE_CHECKING, List, Optional, Tuple, Union

import numpy as np
from arro3.core import Array, ChunkedArray, RecordBatch, Table
from typing import TYPE_CHECKING, List, Optional, Union, overload

import arro3.compute as ac
from arro3.core import (
Array,
ChunkedArray,
DataType,
RecordBatch,
Scalar,
Table,
list_array,
list_flatten,
list_offsets,
)
from traitlets import TraitError

from lonboard._utils import timestamp_start_offset
from lonboard.models import ViewState

if TYPE_CHECKING:
from numpy.typing import NDArray
from lonboard._layer import BaseArrowLayer
from lonboard.experimental._layer import TripsLayer


DEFAULT_PARQUET_COMPRESSION = "ZSTD"
Expand Down Expand Up @@ -91,7 +103,20 @@ def serialize_pyarrow_column(
return serialize_table_to_parquet(pyarrow_table, max_chunksize=max_chunksize)


def serialize_accessor(data: Union[List[int], Tuple[int], NDArray[np.uint8]], obj):
@overload
def serialize_accessor(
data: ChunkedArray,
obj: BaseArrowLayer,
) -> List[bytes]: ...
@overload
def serialize_accessor(
data: Union[str, int, float, list, tuple, bytes],
obj: BaseArrowLayer,
) -> Union[str, int, float, list, tuple, bytes]: ...
def serialize_accessor(
data: Union[str, int, float, list, tuple, bytes, ChunkedArray],
obj: BaseArrowLayer,
):
if data is None:
return None

Expand All @@ -100,12 +125,12 @@ def serialize_accessor(data: Union[List[int], Tuple[int], NDArray[np.uint8]], ob
if isinstance(data, (str, int, float, list, tuple, bytes)):
return data

assert isinstance(data, (ChunkedArray, Array))
assert isinstance(data, ChunkedArray)
validate_accessor_length_matches_table(data, obj.table)
return serialize_pyarrow_column(data, max_chunksize=obj._rows_per_chunk)


def serialize_table(data, obj):
def serialize_table(data: Table, obj: BaseArrowLayer):
assert isinstance(data, Table), "expected Arrow table"
return serialize_table_to_parquet(data, max_chunksize=obj._rows_per_chunk)

Expand Down Expand Up @@ -135,5 +160,35 @@ def serialize_view_state(data: Optional[ViewState], obj):
return data._asdict()


def serialize_timestamp_accessor(
timestamps: ChunkedArray, obj: TripsLayer
) -> List[bytes]:
"""
Subtract off min timestamp to fit into f32 integer range.
Then cast to float32.
"""
# Note: this has some overlap with `timestamp_max_physical_value` in utils.
# Cast to int64 type
timestamps = timestamps.cast(DataType.list(DataType.int64()))

start_offset_adjustment = Scalar(
timestamp_start_offset(timestamps), type=DataType.int64()
)

list_offsets_iter = list_offsets(timestamps)
inner_values_iter = list_flatten(timestamps)

offsetted_chunks = []
for offsets, inner_values in zip(list_offsets_iter, inner_values_iter):
offsetted_values = ac.add(inner_values, start_offset_adjustment)
f32_values = offsetted_values.cast(DataType.int64()).cast(DataType.float32())
offsetted_chunks.append(list_array(offsets, f32_values))

f32_timestamps_col = ChunkedArray(offsetted_chunks)
return serialize_accessor(f32_timestamps_col, obj)


ACCESSOR_SERIALIZATION = {"to_json": serialize_accessor}
TIMESTAMP_ACCESSOR_SERIALIZATION = {"to_json": serialize_timestamp_accessor}
TABLE_SERIALIZATION = {"to_json": serialize_table}
Loading

0 comments on commit 25f7e13

Please sign in to comment.