Skip to content
This repository was archived by the owner on Feb 22, 2023. It is now read-only.

Commit dd2431e

Browse files
Correct data refresh status message next steps (#953)
* Correct status message order next steps * Add delete index recipe * Add slack "status" method for prepending model names * Further message improving, use new status method * Make parameters more consistent * Add one last "next" step * Differentiate between ingestion server steps and all steps
1 parent 40161d5 commit dd2431e

File tree

5 files changed

+44
-30
lines changed

5 files changed

+44
-30
lines changed

ingestion_server/ingestion_server/api.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,10 @@ def on_post(self, req, _):
269269
index_type = target_index.split("-")[0]
270270
if index_type not in MEDIA_TYPES:
271271
index_type = "image"
272-
slack.verbose(f"`{index_type}`: Elasticsearch reindex complete")
272+
slack.verbose(
273+
f"`{index_type}`: Elasticsearch reindex complete | "
274+
f"_Next: re-apply indices & constraints_"
275+
)
273276

274277
elasticsearch = elasticsearch_connect()
275278
indexer = TableIndexer(

ingestion_server/ingestion_server/indexer.py

+16-13
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ def point_alias(self, model_name: str, index_suffix: str, alias: str, **_):
378378
if alias_stat.exists:
379379
if not alias_stat.is_alias:
380380
# Alias is an index, this is fatal.
381-
message = f"There is an index named {alias}, cannot proceed."
381+
message = f"There is an index named `{alias}`, cannot proceed."
382382
log.error(message)
383383
slack.error(message)
384384
return
@@ -395,20 +395,23 @@ def point_alias(self, model_name: str, index_suffix: str, alias: str, **_):
395395
}
396396
)
397397
message = (
398-
f"Migrated alias {alias} "
399-
f"from index {curr_index} to index {dest_index}."
398+
f"Migrated alias `{alias}` from index `{curr_index}` to "
399+
f"index `{dest_index}` | _Next: delete old index_"
400400
)
401401
log.info(message)
402-
slack.info(message)
402+
slack.status(model_name, message)
403403
else:
404404
# Alias is already mapped.
405-
log.info(f"Alias {alias} already points to index {dest_index}.")
405+
log.info(
406+
f"`{model_name}`: Alias `{alias}` already points to "
407+
f"index `{dest_index}`."
408+
)
406409
else:
407410
# Alias does not exist, create it.
408411
self.es.indices.put_alias(index=dest_index, name=alias)
409-
message = f"Created alias {alias} pointing to index {dest_index}."
412+
message = f"Created alias `{alias}` pointing to index `{dest_index}`."
410413
log.info(message)
411-
slack.info(message)
414+
slack.status(model_name, message)
412415

413416
if self.progress is not None:
414417
self.progress.value = 100 # mark job as completed
@@ -441,7 +444,7 @@ def delete_index(
441444
if self.is_bad_request is not None:
442445
self.is_bad_request.value = 1
443446
message = (
444-
f"Alias {target} might be in use so it cannot be deleted. "
447+
f"Alias `{target}` might be in use so it cannot be deleted. "
445448
f"Verify that the API does not use this alias and then use the "
446449
f"`force_delete` parameter."
447450
)
@@ -455,24 +458,24 @@ def delete_index(
455458
if self.is_bad_request is not None:
456459
self.is_bad_request.value = 1
457460
message = (
458-
f"Index {target} is associated with aliases "
461+
f"Index `{target}` is associated with aliases "
459462
f"{target_stat.alt_names}, cannot delete. Delete aliases first."
460463
)
461464
log.error(message)
462465
slack.error(message)
463466
return
464467

465468
self.es.indices.delete(index=target)
466-
message = f"Index {target} was deleted."
469+
message = f"Index `{target}` was deleted - data refresh complete! :tada:"
467470
log.info(message)
468-
slack.info(message)
471+
slack.status(model_name, message)
469472
else:
470473
# Cannot delete as target does not exist.
471474
if self.is_bad_request is not None:
472475
self.is_bad_request.value = 1
473-
message = f"Target {target} does not exist and cannot be deleted."
476+
message = f"Target `{target}` does not exist and cannot be deleted."
474477
log.info(message)
475-
slack.info(message)
478+
slack.status(model_name, message)
476479

477480
if self.progress is not None:
478481
self.progress.value = 100

ingestion_server/ingestion_server/ingest.py

+10-16
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,10 @@ def refresh_api_table(
271271
"""
272272

273273
# Step 1: Get the list of overlapping columns
274-
slack.info(f"`{table}`: Starting data refresh | _Next: copying data from upstream_")
274+
slack.status(
275+
table,
276+
"Starting ingestion server data refresh | _Next: copying data from upstream_",
277+
)
275278
downstream_db = database_connect()
276279
upstream_db = psycopg2.connect(
277280
dbname=UPSTREAM_DB_NAME,
@@ -319,21 +322,15 @@ def refresh_api_table(
319322
log.info(f"Running copy-data query: \n{copy_data.as_string(downstream_cur)}")
320323
downstream_cur.execute(copy_data)
321324

322-
next_step = (
323-
"_Next: {starting data cleaning}_"
324-
if table == "image"
325-
else "Finished refreshing table"
326-
)
327-
slack.verbose(f"`{table}`: Data copy complete | {next_step}")
325+
next_step = "image data cleaning" if table == "image" else "Elasticsearch reindex"
326+
slack.status(table, f"Data copy complete | _Next: {next_step}_")
328327

329328
if table == "image":
330329
# Step 5: Clean the data
331330
log.info("Cleaning data...")
332331
clean_image_data(table)
333332
log.info("Cleaning completed!")
334-
slack.verbose(
335-
f"`{table}`: Data cleaning complete | " f"Finished refreshing table"
336-
)
333+
slack.status(table, "Data cleaning complete | _Next: Elasticsearch reindex_")
337334

338335
downstream_db.close()
339336
log.info(f"Finished refreshing table '{table}'.")
@@ -378,17 +375,14 @@ def promote_api_table(
378375
downstream_cur.execute(remap_constraint)
379376
log.info("Done remapping constraints! Going live with new table...")
380377
_update_progress(progress, 99.0)
381-
slack.verbose(
382-
f"`{table}`: Indices & constraints applied, finished refreshing table | "
383-
f"_Next: Elasticsearch reindex_"
384-
)
378+
slack.status(table, "Indices & constraints applied | _Next: table promotion_")
385379

386380
# Step 8: Promote the temporary table and delete the original
387381
go_live = get_go_live_query(table, index_mapping)
388382
log.info(f"Running go-live: \n{go_live.as_string(downstream_cur)}")
389383
downstream_cur.execute(go_live)
390-
slack.verbose(
391-
f"`{table}`: Finished table promotion | " f"_Next: Elasticsearch promotion_"
384+
slack.status(
385+
table, "Finished table promotion | _Next: Elasticsearch promotion_"
392386
)
393387

394388
downstream_db.close()

ingestion_server/ingestion_server/slack.py

+10
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,13 @@ def info(text: str, summary: str = None) -> None:
7070

7171
def error(text: str, summary: str = None) -> None:
7272
_message(text, summary, level=Level.ERROR)
73+
74+
75+
def status(model: str, text: str) -> None:
76+
"""
77+
Send a message regarding the status of the data refresh.
78+
79+
Model is required an all messages get prepended with the model.
80+
"""
81+
text = f"`{model}`: {text}"
82+
info(text, None)

justfile

+4
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ _ing-api data port="50281":
179179
@promote model="image" suffix="init" alias="image":
180180
just _ing-api '{"model": "{{ model }}", "action": "PROMOTE", "index_suffix": "{{ suffix }}", "alias": "{{ alias }}"}'
181181

182+
# Delete an index in Elasticsearch
183+
@delete model="image" suffix="init" alias="image":
184+
just _ing-api '{"model": "{{ model }}", "action": "DELETE_INDEX", "index_suffix": "{{ suffix }}"}'
185+
182186
# Run ingestion-server tests locally
183187
ing-testlocal *args:
184188
cd ingestion_server && pipenv run ./test/run_test.sh {{ args }}

0 commit comments

Comments
 (0)