Skip to content

Commit 69490d5

Browse files
authored
Add flexibility to ArcGIS Feature Services connector (#1054)
For ArcGIS Feature Services, * add additional test resources (as well as for socrata) * rework connector to use more specific types and validate layer information * update library templates * add cron workflow to pull data weekly
1 parent 0f8b8dc commit 69490d5

30 files changed

+984
-88
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
name: Data Library - 📁 ArcGIS Feature Server Updates
2+
3+
on:
4+
schedule:
5+
# Weekly Cron job
6+
- cron: "0 0 * * 0"
7+
workflow_dispatch:
8+
inputs:
9+
dev_image:
10+
description: "Use dev image specific to this branch? (If exists)"
11+
type: boolean
12+
required: true
13+
default: false
14+
15+
jobs:
16+
dataloading:
17+
runs-on: ubuntu-22.04
18+
container:
19+
image: nycplanning/build-base:${{ inputs.dev_image && format('dev-{0}', github.head_ref || github.ref_name) || 'latest' }}
20+
env:
21+
AWS_S3_BUCKET: edm-recipes
22+
strategy:
23+
matrix:
24+
dataset:
25+
- dcp_cscl_commonplace
26+
- dcp_cscl_complex
27+
- dcp_waterfront_access_map_wpaa
28+
- dcp_waterfront_access_map_pow
29+
- nysdec_freshwater_wetlands_checkzones
30+
- nysdec_freshwater_wetlands
31+
- nysdec_natural_heritage_communities
32+
- nysdec_priority_lakes
33+
- nysdec_priority_estuaries
34+
- nysdec_priority_streams
35+
- nysdec_tidal_wetlands
36+
- nysparks_historicplaces
37+
- nysparks_parks
38+
- nysshpo_historic_buildings_points
39+
- nysshpo_historic_buildings_polygons
40+
- nysshpo_archaeological_buffer_areas
41+
- usnps_parks
42+
fail-fast: false
43+
44+
steps:
45+
- uses: actions/checkout@v4
46+
47+
- name: Load Secrets
48+
uses: 1password/load-secrets-action@v1
49+
with:
50+
export-env: true
51+
env:
52+
OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
53+
AWS_S3_ENDPOINT: "op://Data Engineering/DO_keys/AWS_S3_ENDPOINT"
54+
AWS_SECRET_ACCESS_KEY: "op://Data Engineering/DO_keys/AWS_SECRET_ACCESS_KEY"
55+
AWS_ACCESS_KEY_ID: "op://Data Engineering/DO_keys/AWS_ACCESS_KEY_ID"
56+
BUILD_ENGINE_SERVER: "op://Data Engineering/EDM_DATA/server_url"
57+
58+
- name: Finish container setup ...
59+
working-directory: ./
60+
run: ./bash/docker_container_setup.sh
61+
62+
- name: Library Archive
63+
run: library archive --name ${{ matrix.dataset }} --latest --s3
64+
65+
create_issue_on_failure:
66+
needs: dataloading
67+
runs-on: ubuntu-22.04
68+
if: ${{ failure() && (github.event_name == 'schedule') }}
69+
steps:
70+
- uses: actions/checkout@v4
71+
with:
72+
sparse-checkout: .github
73+
- name: Create issue on failure
74+
uses: JasonEtco/create-an-issue@v2
75+
env:
76+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
77+
ACTION: ${{ github.workflow }}
78+
BUILD_URL: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}
79+
with:
80+
filename: .github/ISSUE_TEMPLATE/scheduled_action_failure.md

dcpy/connectors/esri/arcgis_feature_service.py

Lines changed: 124 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,41 +12,149 @@
1212
)
1313
import yaml
1414

15-
from dcpy.models.connectors.esri import FeatureServer
15+
from dcpy.models.connectors.esri import FeatureServer, FeatureServerLayer
1616
import dcpy.models.product.dataset.metadata as models
1717
from dcpy.utils.logging import logger
1818

1919

20-
def get_metadata(dataset: FeatureServer) -> dict:
21-
resp = requests.get(f"{dataset.url}", params={"f": "pjson"})
20+
def get_feature_server_metadata(feature_server: FeatureServer) -> dict:
21+
"""Given a FeatureServer, return its metadata as a dictionary"""
22+
resp = requests.get(f"{feature_server.url}", params={"f": "pjson"})
2223
resp.raise_for_status()
2324
error = resp.json().get("error") # 200 responses might contain error details
2425
if error:
2526
raise Exception(f"Error fetching ESRI Server metadata: {error}")
2627
return resp.json()
2728

2829

29-
def get_data_last_updated(dataset: FeatureServer) -> datetime:
30-
metadata = get_metadata(dataset)
30+
def get_feature_server_layers(
31+
feature_server: FeatureServer,
32+
) -> list[FeatureServerLayer]:
33+
"""Given a FeatureServer, look up and return its available layers"""
34+
resp = get_feature_server_metadata(feature_server)
35+
return [
36+
FeatureServerLayer(
37+
server=feature_server.server,
38+
name=feature_server.name,
39+
layer_name=l["name"],
40+
layer_id=l["id"],
41+
)
42+
for l in resp["layers"]
43+
]
44+
45+
46+
def resolve_layer(
47+
feature_server: FeatureServer,
48+
layer_name: str | None = None,
49+
layer_id: int | None = None,
50+
) -> FeatureServerLayer:
51+
"""
52+
Given a FeatureServer, and optional layer name or id, resolve layer information
53+
There are a few different modes depending on what is provided
54+
For all modes, layers for the feature_server are looked up. Then, if
55+
- layer_name and layer_id provided - validate layer exists, return it
56+
- layer_name or layer_id provided - lookup layer by provided key
57+
- neither provided - if feature_server has single layer, return it. Otherwise, error
58+
59+
`assert` statements can hopefully be dropped - known bug in mypy to not correctly
60+
narrow types within tuples. See final comments in https://github.com/python/mypy/issues/12364
61+
"""
62+
layers = get_feature_server_layers(feature_server)
63+
layer_labels = [l.layer_label for l in layers]
64+
65+
match layer_id, layer_name:
66+
case None, None:
67+
if len(layers) > 1:
68+
raise ValueError(
69+
f"Feature server {feature_server} has mulitple layers: {layer_labels}"
70+
)
71+
elif len(layers) == 0:
72+
raise LookupError(f"Feature server {feature_server} has no layers")
73+
else:
74+
return layers[0]
75+
case _, None:
76+
assert layer_id is not None
77+
layers_by_id = {l.layer_id: l for l in layers}
78+
if layer_id in layers_by_id:
79+
return layers_by_id[layer_id]
80+
else:
81+
raise LookupError(
82+
f"Layer with id {layer_id} not found in feature server {feature_server}. Found layers: {layer_labels}."
83+
)
84+
case None, _:
85+
assert layer_name is not None
86+
layers_by_name = {l.layer_name: l for l in layers}
87+
if layer_name in layers_by_name:
88+
return layers_by_name[layer_name]
89+
else:
90+
raise LookupError(
91+
f"Layer with name '{layer_name}' not found in feature server {feature_server}. Found layers: {layer_labels}."
92+
)
93+
case _:
94+
assert layer_name is not None
95+
assert layer_id is not None
96+
layer = FeatureServerLayer(
97+
server=feature_server.server,
98+
name=feature_server.name,
99+
layer_name=layer_name,
100+
layer_id=layer_id,
101+
)
102+
if layer not in layers:
103+
raise LookupError(
104+
f"Layer '{layer}' not found in feature server {feature_server}"
105+
)
106+
return layer
107+
108+
109+
def get_layer_metadata(layer: FeatureServerLayer) -> dict:
110+
"""Given FeatureServerLayer, return its metadata as a dictionary"""
111+
resp = requests.get(f"{layer.url}", params={"f": "pjson"})
112+
resp.raise_for_status()
113+
error = resp.json().get("error") # 200 responses might contain error details
114+
if error:
115+
raise Exception(f"Error fetching ESRI Server metadata: {error}")
116+
return resp.json()
117+
118+
119+
def get_data_last_updated(layer: FeatureServerLayer) -> datetime:
120+
"""Given FeatureServerLayer, lookup date of last data edit"""
121+
metadata = get_layer_metadata(layer)
31122
## returned timestamp has milliseconds, fromtimestamp expects seconds
32123
return datetime.fromtimestamp(metadata["editingInfo"]["dataLastEditDate"] / 1e3)
33124

34125

35-
def query_dataset(dataset: FeatureServer, params: dict) -> dict:
36-
resp = requests.post(f"{dataset.url}/query", data=params)
126+
def query_layer(layer: FeatureServerLayer, params: dict) -> dict:
127+
"""
128+
Wrapper to query data for a FeatureServerLayer.
129+
Arguments are `layer`, a FeatureServerLayer, and `params`, which are kwargs for the api call
130+
131+
For these params, we commonly use
132+
- where: essentially a sql where clause. Default of "1=1" should be provided
133+
- outFields: fields to select. Required when querying data. Default of "*" should be provided
134+
- outSr: spatial reference system to get data in
135+
- f: output format. Default of "geojson" should be provided
136+
- returnIdsOnly: boolean flag to only return ids. Doesn't have same query limits as data queries
137+
- objectIds: list of object ids (that can be queried separately) to return. Useful in
138+
139+
Exhaustive list of possible params are here:
140+
https://developers.arcgis.com/rest/services-reference/enterprise/query-feature-service-layer/#request-parameters
141+
"""
142+
resp = requests.post(f"{layer.url}/query", data=params)
37143
resp.raise_for_status()
38144
return resp.json()
39145

40146

41-
def get_dataset(dataset: FeatureServer, crs: int) -> dict:
42-
CHUNK_SIZE = 100
147+
def get_layer(layer: FeatureServerLayer, crs: int, chunk_size=100) -> dict:
148+
"""
149+
Given FeatureServerLayer and desired crs, fetches entire layer as geojson (dict)
150+
"""
43151
params = {"where": "1=1", "outFields": "*", "outSr": crs, "f": "geojson"}
44152

45153
# there is a limit of 2000 features on the server, unless we limit to objectIds only
46-
# so first, we get ids, then we chunk to get full dataset
154+
# so first, we get ids, then we chunk to get full layer
47155
obj_params = params.copy()
48156
obj_params["returnIdsOnly"] = True
49-
object_id_resp = query_dataset(dataset, obj_params)
157+
object_id_resp = query_layer(layer, obj_params)
50158
object_ids = cast(list[int], object_id_resp["properties"]["objectIds"])
51159

52160
features = []
@@ -60,17 +168,17 @@ def get_dataset(dataset: FeatureServer, crs: int) -> dict:
60168
transient=True,
61169
) as progress:
62170
task = progress.add_task(
63-
f"[green]Downloading [bold]{dataset.name}[/bold]", total=len(object_ids)
171+
f"[green]Downloading [bold]{layer.name}[/bold]", total=len(object_ids)
64172
)
65173

66174
def _downcase_properties_keys(feat):
67175
feat["properties"] = {k.lower(): v for k, v in feat["properties"].items()}
68176
return feat
69177

70-
for i in range(0, len(object_ids), CHUNK_SIZE):
71-
params["objectIds"] = object_ids[i : i + CHUNK_SIZE]
72-
chunk = query_dataset(dataset, params)
73-
progress.update(task, completed=i + CHUNK_SIZE)
178+
for i in range(0, len(object_ids), chunk_size):
179+
params["objectIds"] = object_ids[i : i + chunk_size]
180+
chunk = query_layer(layer, params)
181+
progress.update(task, completed=i + chunk_size)
74182
features += [_downcase_properties_keys(feat) for feat in chunk["features"]]
75183

76184
return {"type": "FeatureCollection", "crs": crs, "features": features}

dcpy/library/config.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,19 @@ def compute(self) -> DatasetDefinition:
8282
if _config.source.socrata:
8383
version = self.version_socrata(_config.source.socrata.uid)
8484
elif _config.source.arcgis_feature_server:
85+
fs = _config.source.arcgis_feature_server
86+
feature_server_layer = arcgis_feature_service.resolve_layer(
87+
feature_server=fs.feature_server,
88+
layer_name=fs.layer_name,
89+
layer_id=fs.layer_id,
90+
)
91+
_config.source.arcgis_feature_server = (
92+
DatasetDefinition.SourceSection.FeatureServerLayerDefinition(
93+
**feature_server_layer.model_dump()
94+
)
95+
)
8596
version = arcgis_feature_service.get_data_last_updated(
86-
_config.source.arcgis_feature_server
97+
feature_server_layer
8798
).strftime("%Y%m%d")
8899
else:
89100
# backwards compatibility before templates were simplified
@@ -135,8 +146,9 @@ def compute(self) -> DatasetDefinition:
135146
tmp_dir.mkdir(exist_ok=True)
136147
if not (config.source.geometry and config.source.geometry.SRS):
137148
raise Exception("Must provide source crs for arcgis feature server")
138-
geojson = arcgis_feature_service.get_dataset(
139-
config.source.arcgis_feature_server,
149+
150+
geojson = arcgis_feature_service.get_layer(
151+
feature_server_layer,
140152
crs=int(config.source.geometry.SRS.strip("EPSG:")),
141153
)
142154
file = tmp_dir / f"{config.name}.geojson"

dcpy/library/templates/dcp_waterfront_access_map_pow.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ dataset:
55
arcgis_feature_server:
66
server: dcp
77
name: nypubliclyownedwaterfront
8-
layer: 0
98
options:
109
- "AUTODETECT_TYPE=NO"
1110
- "EMPTY_STRING_AS_NULL=YES"

dcpy/library/templates/dcp_waterfront_access_map_wpaa.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ dataset:
55
arcgis_feature_server:
66
server: dcp
77
name: nywpaa
8-
layer: 0
98
options:
109
- "AUTODETECT_TYPE=NO"
1110
- "EMPTY_STRING_AS_NULL=YES"

dcpy/library/templates/nysdec_freshwater_wetlands.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ dataset:
55
arcgis_feature_server:
66
server: nys_clearinghouse
77
name: State_Regulated_Freshwater_Wetlands
8-
layer: 0
8+
layer_name: State Regulated Freshwater Wetlands
99
geometry:
1010
SRS: EPSG:2263
1111
type: MULTIPOLYGON

dcpy/library/templates/nysdec_freshwater_wetlands_checkzones.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ dataset:
55
arcgis_feature_server:
66
server: nys_clearinghouse
77
name: State_Regulated_Freshwater_Wetlands
8-
layer: 1
8+
layer_name: State Regulated Freshwater Wetlands Checkzone
99
geometry:
1010
SRS: EPSG:2263
1111
type: MULTIPOLYGON

dcpy/library/templates/nysdec_lands.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ dataset:
55
arcgis_feature_server:
66
server: nys_clearinghouse
77
name: NYS_DEC_Lands
8-
layer: 0
8+
layer_id: 0
99
geometry:
1010
SRS: EPSG:26918
1111
type: MULTIPOLYGON

dcpy/library/templates/nysdec_priority_estuaries.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ dataset:
55
arcgis_feature_server:
66
server: nys_clearinghouse
77
name: Waterbody_Inventory_List
8-
layer: 3
8+
layer_name: Priority Waterbody List - Estuaries
99
geometry:
1010
SRS: EPSG:2263
1111
type: MULTIPOLYGON

dcpy/library/templates/nysdec_priority_lakes.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ dataset:
55
arcgis_feature_server:
66
server: nys_clearinghouse
77
name: Waterbody_Inventory_List
8-
layer: 2
8+
layer_name: Priority Waterbody List - Lakes
99
geometry:
1010
SRS: EPSG:2263
1111
type: MULTIPOLYGON

0 commit comments

Comments
 (0)