Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions dcpy/connectors/overture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from overturemaps import cli as overturemaps_cli
from overturemaps.core import record_batch_reader, type_theme_map
from pathlib import Path
from typing import Literal

from dcpy.connectors.registry import Connector

BASE_URL = "az://overturemapswestus2.blob.core.windows.net"

RELEASE = "2025-10-22.0"

OvertureType = Literal[
"address",
"bathymetry",
"building",
"building_part",
"division",
"division_area",
"division_boundary",
"place",
"segment",
"connector",
"infrastructure",
"land",
"land_cover",
"land_use",
"water",
]

nyc_bbox = (-74.2591, 40.4766, -73.7002, 40.9174)


# took 27 s to pull NYC data on my home wifi
def download(
*,
bbox: tuple[float, float, float, float] = nyc_bbox,
output_format: Literal["geoparquet", "geojson", "geojsonseq"] = "geoparquet",
output: Path | None = None,
type_: OvertureType,
release: str | None = RELEASE,
connect_timeout: int | None = None,
request_timeout: int | None = None,
stac: bool = True,
):
output = output or Path(f"{type_}.{output_format}")

reader = record_batch_reader(
type_, bbox, release, connect_timeout, request_timeout, stac
)

if reader is None:
return

with overturemaps_cli.get_writer(
output_format, output, schema=reader.schema
) as writer:
overturemaps_cli.copy(reader, writer)


# took 36 s to pull NYC data on my home wifi
def download_duckdb(
type_: OvertureType,
output: Path,
bbox: tuple[float, float, float, float] = nyc_bbox,
):
import duckdb

xmin, ymin, xmax, ymax = bbox

duckdb.sql(f"""
LOAD spatial; -- noqa

SET s3_region='us-west-2';

COPY(
SELECT
*
FROM
read_parquet('{BASE_URL}/release/{RELEASE}/theme={type_theme_map[type_]}/type={type_}/*', filename=true, hive_partitioning=1)
WHERE
bbox.xmin <= {xmax} AND
bbox.xmax >= {xmin} AND
bbox.ymin <= {ymax} AND
bbox.ymax >= {ymin}
) TO '{output}';
""")


class OvertureConnector(Connector):
conn_type: str = "overture"

def _pull(
self,
key: str,
destination_path: Path,
*,
format: Literal["geoparquet", "geojson"] = "geoparquet",
filename: str | None = None,
bbox: tuple[float, float, float, float] = nyc_bbox,
**kwargs,
) -> dict:
extension = "parquet" if format == "geoparquet" else format
filename = filename or f"{key}.{extension}"
destination_path.mkdir(parents=True, exist_ok=True)
output = destination_path / filename
download(type_=key, bbox=bbox, output_format=format, output=output)
# download_duckdb(type_=key, bbox=bbox, output=output)
return {"path": output}

def pull(self, key: str, destination_path: Path, **kwargs):
return self._pull(key, destination_path, **kwargs)
3 changes: 2 additions & 1 deletion dcpy/lifecycle/connector_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from dcpy.connectors.socrata.connector import SocrataConnector
from dcpy.connectors.esri.arcgis_feature_service import ArcGISFeatureServiceConnector
from dcpy.connectors import filesystem, web, s3, ingest_datastore, sftp
from dcpy.connectors import filesystem, web, s3, ingest_datastore, sftp, overture
from dcpy.connectors.registry import (
ConnectorRegistry,
Connector,
Expand Down Expand Up @@ -81,6 +81,7 @@ def _set_default_connectors():
),
"ginger", # TODO - name and env var names should be configurable
],
overture.OvertureConnector(),
]

for conn in conns:
Expand Down
46 changes: 29 additions & 17 deletions dcpy/lifecycle/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import pandas as pd
from pathlib import Path
import typer
from typing import Literal

from dcpy.lifecycle import config
from dcpy.lifecycle.connector_registry import connectors
from dcpy.models.connectors.edm.recipes import DatasetType
from dcpy.models.lifecycle.builds import InputDataset
from dcpy.utils import postgres
from dcpy.utils import duckdb
from dcpy.utils.geospatial import parquet as geoparquet
from dcpy.utils.logging import logger

Expand Down Expand Up @@ -65,6 +67,7 @@ def load_dataset_into_pg(
*,
include_version_col: bool = True,
include_ogc_fid_col: bool = True,
engine: Literal["pandas", "duckdb"] = "duckdb",
):
has_preprocessor = ds.preprocessor is not None
preprocessor = _get_preprocessor(ds)
Expand All @@ -81,16 +84,9 @@ def load_dataset_into_pg(
target_table_name=ds_table_name,
)
else:
if ds.file_type == DatasetType.csv:
raw_df = pd.read_csv(local_dataset_path, dtype=str)
df = preprocessor(ds.id, raw_df) if has_preprocessor else raw_df
elif ds.file_type == DatasetType.parquet:
raw_df = geoparquet.read_df(local_dataset_path)
df = preprocessor(ds.id, raw_df) if has_preprocessor else raw_df
elif ds.file_type == DatasetType.json:
if ds.file_type == DatasetType.json:
with open(local_dataset_path, "r") as json_file:
records = json.load(json_file)

if not has_preprocessor:
logger.warning(
"Coverting JSON to a dataframe without a preprocessor. This could have unintended results."
Expand All @@ -99,15 +95,31 @@ def load_dataset_into_pg(
else:
df = preprocessor(ds.id, records)
else:
raise Exception(f"Invalid file_type for {ds.id}: {ds.file_type}")

# make column names more sql-friendly
columns = {
column: column.strip().replace("-", "_").replace("'", "_").replace(" ", "_")
for column in df.columns
}
df.rename(columns=columns, inplace=True)
pg_client.insert_dataframe(df, ds_table_name)
if engine == "duckdb":
duckdb.copy_file_to_table(
local_dataset_path, ds_table_name, pg_client=pg_client
)
df = None
else:
if ds.file_type == DatasetType.csv:
raw_df = pd.read_csv(local_dataset_path, dtype=str)
elif ds.file_type == DatasetType.parquet:
raw_df = geoparquet.read_df(local_dataset_path)
else:
raise Exception(f"Invalid file_type for {ds.id}: {ds.file_type}")
df = preprocessor(ds.id, raw_df) if has_preprocessor else raw_df

if df is not None:
# make column names more sql-friendly
columns = {
column: column.strip()
.replace("-", "_")
.replace("'", "_")
.replace(" ", "_")
for column in df.columns
}
df.rename(columns=columns, inplace=True)
pg_client.insert_dataframe(df, ds_table_name)

if include_ogc_fid_col:
# This maybe should be applicable to pg_dumps, but they tend to have this column already
Expand Down
33 changes: 33 additions & 0 deletions dcpy/utils/duckdb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import duckdb # type: ignore
import os
from pathlib import Path

from dcpy.utils.postgres import PostgresClient


def setup_s3_secret(conn: duckdb.DuckDBPyConnection | None = None) -> None:
Expand All @@ -15,3 +18,33 @@ def setup_s3_secret(conn: duckdb.DuckDBPyConnection | None = None) -> None:
);
"""
)


def setup_postgres(database: str | None = None):
database = database or os.environ["BUILD_ENGINE_DB"]
duckdb.sql("INSTALL spatial;")
duckdb.sql("LOAD spatial;")
duckdb.sql("INSTALL postgres;")
duckdb.sql("LOAD postgres;")
duckdb.sql(f"""
CREATE SECRET postgres_build_engine (
TYPE POSTGRES,
HOST '{os.environ["BUILD_ENGINE_HOST"]}',
PORT '{os.environ["BUILD_ENGINE_PORT"]}',
DATABASE '{database}',
USER '{os.environ["BUILD_ENGINE_USER"]}',
PASSWORD '{os.environ["BUILD_ENGINE_PASSWORD"]}'
);""")
duckdb.sql("ATTACH '' AS pg (TYPE POSTGRES, SECRET postgres_build_engine);")


def copy_file_to_table(
filepath: Path, table_name: str, pg_client: PostgresClient | None = None
):
if pg_client:
table_name = f"pg.{pg_client.schema}.{table_name}"
duckdb.sql(f"CREATE TABLE {table_name} AS SELECT * FROM '{filepath}'")


setup_s3_secret()
setup_postgres()
Loading