-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patharcgis_feature_layer.py
255 lines (199 loc) · 7.86 KB
/
arcgis_feature_layer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# requirements:
# psycopg2-binary
# requests~=2.32
import logging
import os
from pathlib import Path
import requests
from f.common_logic.db_operations import postgresql
from f.common_logic.save_disk import save_data_to_file
from f.connectors.geojson.geojson_to_postgres import main as save_geojson_to_postgres
# type names that refer to Windmill Resources
c_arcgis_account = dict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def main(
arcgis_account: c_arcgis_account,
feature_layer_url: str,
db: postgresql,
db_table_name: str,
attachment_root: str = "/persistent-storage/datalake",
):
storage_path = Path(attachment_root) / db_table_name
arcgis_token = get_arcgis_token(arcgis_account)
features = get_features_from_arcgis(feature_layer_url, arcgis_token)
features_with_attachments = download_feature_attachments(
features, feature_layer_url, arcgis_token, storage_path
)
features_with_global_ids = set_global_id(features_with_attachments)
geojson = {"type": "FeatureCollection", "features": features_with_global_ids}
# At this point, the ArcGIS data is GeoJSON-compliant, and we don't need anything
# from the REST API anymore. We can save the data to a file and then to Postgres.
save_data_to_file(
geojson,
db_table_name,
storage_path,
file_type="geojson",
)
rel_geojson_path = Path(db_table_name) / f"{db_table_name}.geojson"
save_geojson_to_postgres(
db,
db_table_name,
rel_geojson_path,
attachment_root,
False, # do not delete the file after saving to Postgres
)
def get_arcgis_token(arcgis_account: c_arcgis_account):
"""
Generate an ArcGIS token using the provided account credentials.
Parameters
----------
arcgis_account : dict
A dictionary containing the ArcGIS account credentials with keys "username" and "password".
Returns
-------
str
The generated ArcGIS token.
"""
arcgis_username = arcgis_account["username"]
arcgis_password = arcgis_account["password"]
# According to the ArcGIS REST API documentation, you can set `client to `requestip`
# to generate a token based on the IP address of the request. However, this does not
# seem to work well, neither in local development nor in production. Therefore, we use
# `referer` as the client type, and use the base URL of the Windmill app as the referer.
# https://developers.arcgis.com/rest/services-reference/enterprise/generate-token/
token_response = requests.post(
"https://www.arcgis.com/sharing/rest/generateToken",
data={
"username": arcgis_username,
"password": arcgis_password,
"client": "referer",
"referer": os.environ.get("WM_BASE_URL"),
"f": "json",
},
)
arcgis_token = token_response.json().get("token")
return arcgis_token
def get_features_from_arcgis(feature_layer_url: str, arcgis_token: str):
"""
Fetch features from an ArcGIS feature layer using the provided token.
Parameters
----------
feature_layer_url : str
The URL of the ArcGIS feature layer.
arcgis_token : str
The ArcGIS token for authentication.
Returns
-------
list
A list of features retrieved from the ArcGIS feature layer.
"""
response = requests.get(
f"{feature_layer_url}/0/query",
params={
"where": "1=1", # get all features
"outFields": "*", # get all fields
"returnGeometry": "true",
"f": "geojson",
"token": arcgis_token,
},
)
if (
response.status_code != 200 or "error" in response.json()
): # ArcGIS sometimes returns 200 with an error message e.g. if a token is invalid
try:
error_message = (
response.json().get("error", {}).get("message", "Unknown error")
)
except (KeyError, ValueError):
error_message = "Unknown error"
raise ValueError(f"Error fetching features: {error_message}")
features = response.json().get("features", [])
logger.info(f"{len(features)} features fetched from the ArcGIS feature layer")
return features
def download_feature_attachments(
features: list, feature_layer_url: str, arcgis_token: str, storage_path: str
):
"""
Download attachments for each feature and save them to the specified directory.
Parameters
----------
features : list
A list of features for which attachments need to be downloaded.
feature_layer_url : str
The URL of the ArcGIS feature layer.
arcgis_token : str
The ArcGIS token for authentication.
storage_path : str
The directory where attachments should be saved.
Returns
-------
list
The list of features with updated properties including attachment information.
"""
total_downloaded_attachments = 0
skipped_attachments = 0
for feature in features:
object_id = feature["properties"]["objectid"]
attachments_response = requests.get(
f"{feature_layer_url}/0/{object_id}/attachments",
params={"f": "json", "token": arcgis_token},
)
attachments_response.raise_for_status()
attachments = attachments_response.json().get("attachmentInfos", [])
if not attachments:
logger.info(f"No attachments found for object_id {object_id}")
continue
for attachment in attachments:
attachment_id = attachment["id"]
attachment_name = attachment["name"]
attachment_content_type = attachment["contentType"]
attachment_keywords = attachment["keywords"]
feature["properties"][f"{attachment_keywords}_filename"] = attachment_name
feature["properties"][f"{attachment_keywords}_content_type"] = (
attachment_content_type
)
attachment_path = Path(storage_path) / "attachments" / attachment_name
if attachment_path.exists():
logger.debug(
f"File already exists, skipping download: {attachment_path}"
)
skipped_attachments += 1
continue
attachment_response = requests.get(
f"{feature_layer_url}/0/{object_id}/attachments/{attachment_id}",
params={"f": "json", "token": arcgis_token},
)
attachment_response.raise_for_status()
attachment_data = attachment_response.content
attachment_path.parent.mkdir(parents=True, exist_ok=True)
with open(attachment_path, "wb") as f:
f.write(attachment_data)
logger.info(
f"Downloaded attachment {attachment_name} (content type: {attachment_content_type})"
)
total_downloaded_attachments += 1
logger.info(f"Total downloaded attachments: {total_downloaded_attachments}")
logger.info(f"Total skipped attachments: {skipped_attachments}")
return features
def set_global_id(features: list):
"""
Set the feature ID of each feature to its global ID (which is a uuid).
ArcGIS uses global IDs to uniquely identify features, but the
feature ID is set to the object ID by default (which is an integer
incremented by 1 for each feature). UUIDs are more reliable for
uniquely identifying features, and using them instead is consistent
with how we store other data in the data warehouse.
https://support.esri.com/en-us/gis-dictionary/globalid
Parameters
----------
features : list
A list of features to update.
Returns
-------
list
The list of features with updated feature IDs.
"""
for feature in features:
feature["id"] = feature["properties"]["globalid"]
return features