Skip to content

Commit

Permalink
Parallel mbtiles creation bugfixes (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
kueda authored Mar 4, 2023
1 parent 74b256d commit 147c394
Show file tree
Hide file tree
Showing 13 changed files with 60 additions and 36 deletions.
12 changes: 9 additions & 3 deletions packs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import shutil
import re

from database import make_database
from elevation import make_contours
from osm import make_ways, make_context
from rocks import make_rocks
Expand All @@ -38,6 +37,7 @@
ATTRIBUTES_FOR_METADATA = [
"admin1",
"admin2",
"bbox",
"description",
"id",
"name"
Expand Down Expand Up @@ -74,6 +74,13 @@ def list_packs():
print(f"\t{pack_id}: {pack['description']}")


def get_pack(pack_id):
"""Get a pack by pack ID or raise a human-readble exception"""
try:
return PACKS[pack_id]
except KeyError as exc:
raise Exception(f"Pack {pack_id} does not exist") from exc

def get_build_dir():
"""Return absolute path to the directory where pack files will be written"""
return os.path.join(pathlib.Path(__file__).parent.absolute(), "build")
Expand All @@ -83,7 +90,7 @@ def add_metadata_to_pack(pack):
"""Augments a pack dict that has info about a specific file with descriptive static metadata"""
if pack["id"] not in PACKS:
return pack
full_pack = PACKS[pack["id"]]
full_pack = get_pack(pack["id"])
filtered = dict((k, full_pack[k]) for k in ATTRIBUTES_FOR_METADATA if k in full_pack)
return {
**pack,
Expand Down Expand Up @@ -259,7 +266,6 @@ def make_pack(pack_id, clean=False, clean_rocks=False, clean_water=False,
clean_ways=False, clean_context=False, clean_contours=False,
procs=2):
"""Generate a pack and write it to the build directory"""
# make_database()
pack_dir = get_pack_dir(pack_id)
make_rocks_for_pack(pack_id, clean=(clean or clean_rocks), procs=procs)
make_water_for_pack(pack_id, clean=(clean or clean_water), procs=procs)
Expand Down
14 changes: 7 additions & 7 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
aiofiles
Fiona
Shapely
psycopg2
pandas
requests
tqdm
httpx
mercantile
aiofiles
supermercado
pandas
psycopg2
pytest
requests
Shapely
supermercado
tqdm
25 changes: 15 additions & 10 deletions rocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from sources import util
from sources.util import rocks
from sources.util.citations import load_citation_for_source, CITATIONS_TABLE_NAME
from database import DBNAME, SRID
from database import DBNAME, SRID, make_database

NUM_PROCESSES = 4

Expand Down Expand Up @@ -152,27 +152,31 @@ def process_source(source_identifier, clean=False):
dbname=DBNAME
)
util.log("Repairing invalid geometries...")
util.run_sql(
f"UPDATE {work_source_table_name} SET geom = ST_MakeValid(geom) WHERE NOT ST_IsValid(geom)"
)
util.run_sql(f"""
UPDATE {work_source_table_name}
SET geom = ST_MakeValid(geom)
WHERE NOT ST_IsValid(geom)
""")
util.log("Removing polygon overlaps...")
remove_polygon_overlaps(work_source_table_name)
util.run_sql(
f"DELETE FROM {work_source_table_name} "
"WHERE ST_GeometryType(geom) = 'ST_GeometryCollection'"
)
util.run_sql(
f"UPDATE {work_source_table_name} SET geom = ST_MakeValid(geom) WHERE NOT ST_IsValid(geom)"
)
util.run_sql(f"""
UPDATE {work_source_table_name}
SET geom = ST_MakeValid(geom)
WHERE NOT ST_IsValid(geom)
""")
load_citation_for_source(source_identifier)
util.log(f"Finished processing source: {source_identifier}")
except subprocess.CalledProcessError as e:
except subprocess.CalledProcessError as process_error:
# If you don't do this, exceptions in subprocesses may not print stack
# traces
print(f"Failed to process {source_identifier}")
print(f"Failed to process {source_identifier}. Error: {process_error}")
traceback.print_exc()
print()
raise e
raise process_error


def clip_source_polygons_by_mask(source_table_name):
Expand Down Expand Up @@ -356,6 +360,7 @@ def make_rocks(
bbox=None
):
"""Make rocks MBTiles from a collection of sources"""
make_database()
if clean:
clean_sources(sources)
load_units(sources, clean=clean, procs=procs)
Expand Down
2 changes: 1 addition & 1 deletion sources/of97_744/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def run():
dir_path = "of97-744-shapefiles"
if not os.path.isdir(dir_path):
print("EXTRACTING ARCHIVE...")
util.call_cmd(["unzip", download_path])
util.unzip( download_path )

# In this case we're converting the existing shapefile to JSON and
# translating some coded data into a usable form in one step
Expand Down
2 changes: 1 addition & 1 deletion sources/ofr20151175.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def get_archive(url):
gdb_path = "JOTR_OFR_v10-2.gdb"
if not os.path.isdir(gdb_path):
print("EXTRACTING ARCHIVE...")
util.call_cmd(["unzip", download_path])
util.unzip(download_path)
return os.path.realpath(gdb_path)


Expand Down
2 changes: 1 addition & 1 deletion sources/rgm_004/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_archive(url, data_path):
is_extracted = os.path.isfile(data_path)
if not is_extracted:
print("EXTRACTING ARCHIVE...")
util.call_cmd(["unzip", download_path])
util.unzip(download_path)
return os.path.realpath(data_path)


Expand Down
2 changes: 1 addition & 1 deletion sources/sim3143.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def fetch(url):
shp_dir_path = "HawaiiStateGeologicMap_GeMS-open"
if not os.path.isdir(shp_dir_path):
util.log("EXTRACTING ARCHIVE...")
util.call_cmd(["unzip", download_path])
util.unzip(download_path)
return os.path.realpath(shp_dir_path)


Expand Down
6 changes: 5 additions & 1 deletion sources/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def run_sql_with_retries(
"""Run a SQL statement with backoff retries"""
try:
return run_sql(sql, dbname=dbname, quiet=quiet)
except psycopg2.OperationalError as pg_err:
except (psycopg2.OperationalError, psycopg2.IntegrityError) as pg_err:
if retry > max_retries:
raise pg_err
sleep = retry ** 3
Expand Down Expand Up @@ -271,3 +271,7 @@ def add_table_from_query_to_mbtiles(
mbtiles_path,
sql
], check=True)

def unzip(path):
"""Unzip a zip file at a path, updating if necessary"""
return call_cmd(["unzip", "-u", path])
4 changes: 2 additions & 2 deletions sources/util/citations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from . import make_work_dir, run_sql, log
from . import make_work_dir, run_sql, run_sql_with_retries, log
from psycopg2.errors import UndefinedTable
import json
import os
Expand All @@ -11,7 +11,7 @@

def create_table():
"""Create the citations table in the database"""
run_sql(f"""
run_sql_with_retries(f"""
CREATE TABLE IF NOT EXISTS {CITATIONS_TABLE_NAME} (
source VARCHAR(255),
citation TEXT)
Expand Down
12 changes: 10 additions & 2 deletions sources/util/rocks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@

import xml.etree.ElementTree as ET

from .. import call_cmd, extless_basename, extract_e00, make_work_dir, met2xml, polygonize_arcs
from .. import (
call_cmd,
extless_basename,
extract_e00,
make_work_dir,
met2xml,
polygonize_arcs,
unzip
)
from ..proj import NAD27_UTM10_PROJ4, SRS
from .constants import *

Expand Down Expand Up @@ -428,7 +436,7 @@ def process_usgs_source(
):
call_cmd(["tar", "xzvf", download_path])
elif use_unzip:
call_cmd(["unzip", download_path])
unzip(download_path)
else:
copy_path = download_path + "-copy"
call_cmd(["cp", download_path, copy_path])
Expand Down
2 changes: 1 addition & 1 deletion sources/util/tiger_water.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def download(fips):
log(f"Archive already extracted at {shp_path}, skipping...")
else:
log("EXTRACTING ARCHIVE...")
call_cmd(["unzip", "-o", download_path, "-d", work_path])
call_cmd(["unzip", "-u", "-o", download_path, "-d", work_path])


def make_gpkg(fips, dst_path, append=False):
Expand Down
7 changes: 4 additions & 3 deletions sources/util/usgs_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from . import (
call_cmd,
log,
make_work_dir
make_work_dir,
unzip
)
from .proj import SRS as DEST_SRS
from .rocks import (
Expand All @@ -34,7 +35,7 @@ def download_shapes(state, base_url):
shp_path = f"{state.lower()}geol_poly_dd.shp"
if not os.path.isfile(shp_path):
log("EXTRACTING ARCHIVE...")
call_cmd(["unzip", download_path])
unzip(download_path)
return os.path.realpath(shp_path)


Expand All @@ -49,7 +50,7 @@ def download_attributes(state, base_url):
csv_path = f"{state}units.csv"
if not os.path.isfile(csv_path):
log("EXTRACTING ARCHIVE...")
call_cmd(["unzip", download_path])
unzip(download_path)
if not os.path.isfile(csv_path):
csv_path = re.sub(f'^{state}', state.lower(), csv_path)
if not os.path.isfile(csv_path):
Expand Down
6 changes: 3 additions & 3 deletions sources/util/water.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import xml.etree.ElementTree as ET

from . import call_cmd, extless_basename, log, make_work_dir
from . import call_cmd, extless_basename, log, make_work_dir, unzip
from .proj import GRS80_LONGLAT, SRS

WATERWAYS_FNAME = "waterways.gpkg"
Expand Down Expand Up @@ -38,7 +38,7 @@ def process_omca_creeks_source(url, dir_name, waterways_shp_path,
dir_path = dir_name
if not os.path.isdir(dir_path):
print("EXTRACTING ARCHIVE...")
call_cmd(["unzip", download_path])
unzip(download_path)

# Project into EPSG 4326 along with name, type, and natural attributes
waterways_gpkg_path = WATERWAYS_FNAME
Expand Down Expand Up @@ -419,7 +419,7 @@ def process_nhdplus_hr_source(
log(f"Archive already extracted at {gdb_path}, skipping...")
else:
log("EXTRACTING ARCHIVE...")
call_cmd(["unzip", "-o", download_path])
call_cmd(["unzip", "-u", "-o", download_path])
process_nhdplus_hr_source_waterways(gdb_path, srs)
process_nhdplus_hr_source_waterbodies(gdb_path, srs)
process_nhdplus_hr_source_watersheds(gdb_path, srs)
Expand Down

0 comments on commit 147c394

Please sign in to comment.